Fixed several issues from code review
Most importantly wrote the Lock, Read_Lock_Instance, and Write_Lock_Instance classes in lock.py Updated the locking in database.py TODO: Lock on larger areas
This commit is contained in:
parent
9345e78779
commit
f406fcb843
136
lib/spack/llnl/util/lock.py
Normal file
136
lib/spack/llnl/util/lock.py
Normal file
@ -0,0 +1,136 @@
|
||||
##############################################################################
|
||||
# Copyright (c) 2013, Lawrence Livermore National Security, LLC.
|
||||
# Produced at the Lawrence Livermore National Laboratory.
|
||||
#
|
||||
# This file is part of Spack.
|
||||
# Written by Todd Gamblin, tgamblin@llnl.gov, All rights reserved.
|
||||
# LLNL-CODE-647188
|
||||
#
|
||||
# For details, see https://scalability-llnl.github.io/spack
|
||||
# Please also see the LICENSE file for our notice and the LGPL.
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License (as published by
|
||||
# the Free Software Foundation) version 2.1 dated February 1999.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful, but
|
||||
# WITHOUT ANY WARRANTY; without even the IMPLIED WARRANTY OF
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the terms and
|
||||
# conditions of the GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Lesser General Public License
|
||||
# along with this program; if not, write to the Free Software Foundation,
|
||||
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
##############################################################################
|
||||
import os
|
||||
import fcntl
|
||||
import errno
|
||||
import time
|
||||
import socket
|
||||
|
||||
|
||||
class Read_Lock_Instance(object):
|
||||
"""
|
||||
A context manager for getting shared access to the object lock
|
||||
Arguments are lock and timeout (default 5 minutes)
|
||||
"""
|
||||
def __init__(self,lock,timeout = 300):
|
||||
self._lock = lock
|
||||
self._timeout = timeout
|
||||
def __enter__(self):
|
||||
self._lock.acquire_read(self._timeout)
|
||||
def __exit__(self,type,value,traceback):
|
||||
self._lock.release_read()
|
||||
|
||||
|
||||
class Write_Lock_Instance(object):
|
||||
"""
|
||||
A context manager for getting exclusive access to the object lock
|
||||
Arguments are lock and timeout (default 5 minutes)
|
||||
"""
|
||||
def __init__(self,lock,timeout = 300):
|
||||
self._lock = lock
|
||||
self._timeout = timeout
|
||||
def __enter__(self):
|
||||
self._lock.acquire_write(self._timeout)
|
||||
def __exit__(self,type,value,traceback):
|
||||
self._lock.release_write()
|
||||
|
||||
|
||||
class Lock(object):
|
||||
def __init__(self,file_path):
|
||||
self._file_path = file_path
|
||||
self._fd = os.open(file_path,os.O_RDWR)
|
||||
self._reads = 0
|
||||
self._writes = 0
|
||||
|
||||
|
||||
def acquire_read(self,timeout):
|
||||
"""
|
||||
Implements recursive lock. If held in both read and write mode,
|
||||
the write lock will be maintained until all locks are released
|
||||
"""
|
||||
if self._reads == 0 and self._writes == 0:
|
||||
self._lock(fcntl.LOCK_SH,timeout)
|
||||
self._reads += 1
|
||||
|
||||
|
||||
def acquire_write(self,timeout):
|
||||
"""
|
||||
Implements recursive lock
|
||||
"""
|
||||
if self._writes == 0:
|
||||
self._lock(fcntl.LOCK_EX,timeout)
|
||||
self._writes += 1
|
||||
|
||||
|
||||
def _lock(self,op,timeout):
|
||||
"""
|
||||
The timeout is implemented using nonblocking flock()
|
||||
to avoid using signals for timing
|
||||
Write locks store pid and host information to the lock file
|
||||
Read locks do not store data
|
||||
"""
|
||||
total_time = 0
|
||||
while total_time < timeout:
|
||||
try:
|
||||
fcntl.flock(self._fd, op | fcntl.LOCK_NB)
|
||||
if op == fcntl.LOCK_EX:
|
||||
with open(self._file_path,'w') as f:
|
||||
f.write("pid = "+str(os.getpid())+", host = "+socket.getfqdn())
|
||||
return
|
||||
except IOError as error:
|
||||
if error.errno == errno.EAGAIN or error.errno == EACCES:
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
time.sleep(0.1)
|
||||
total_time += 0.1
|
||||
|
||||
|
||||
def release_read(self):
|
||||
"""
|
||||
Assert there is a lock of the right type to release, recursive lock
|
||||
"""
|
||||
assert self._reads > 0
|
||||
if self._reads == 1 and self._writes == 0:
|
||||
self._unlock()
|
||||
self._reads -= 1
|
||||
|
||||
|
||||
def release_write(self):
|
||||
"""
|
||||
Assert there is a lock of the right type to release, recursive lock
|
||||
"""
|
||||
assert self._writes > 0
|
||||
if self._writes == 1 and self._reads == 0:
|
||||
self._unlock()
|
||||
self._writes -= 1
|
||||
|
||||
|
||||
def _unlock(self):
|
||||
"""
|
||||
Releases the lock regardless of mode. Note that read locks may be
|
||||
masquerading as write locks at times, but this removes either.
|
||||
"""
|
||||
fcntl.flock(self._fd,fcntl.LOCK_UN)
|
@ -38,6 +38,7 @@
|
||||
import llnl.util.tty as tty
|
||||
from llnl.util.filesystem import join_path
|
||||
from llnl.util.lang import *
|
||||
from llnl.util.lock import *
|
||||
|
||||
import spack.error
|
||||
import spack.spec
|
||||
@ -58,7 +59,7 @@ def converter(self, spec_like, **kwargs):
|
||||
|
||||
|
||||
class Database(object):
|
||||
def __init__(self,root,file_name="specDB.yaml"):
|
||||
def __init__(self,root,file_name="_index.yaml"):
|
||||
"""
|
||||
Create an empty Database
|
||||
Location defaults to root/specDB.yaml
|
||||
@ -67,28 +68,31 @@ def __init__(self,root,file_name="specDB.yaml"):
|
||||
path: the path to the install of that package
|
||||
dep_hash: a hash of the dependence DAG for that package
|
||||
"""
|
||||
self.root = root
|
||||
self.file_name = file_name
|
||||
self.file_path = join_path(self.root,self.file_name)
|
||||
self._root = root
|
||||
self._file_name = file_name
|
||||
self._file_path = join_path(self._root,self._file_name)
|
||||
|
||||
self.lock_name = "db_lock"
|
||||
self.lock_path = join_path(self.root,self.lock_name)
|
||||
self._lock_name = "_db_lock"
|
||||
self._lock_path = join_path(self._root,self._lock_name)
|
||||
if not os.path.exists(self._lock_path):
|
||||
open(self._lock_path,'w').close()
|
||||
self.lock = Lock(self._lock_path)
|
||||
|
||||
self.data = []
|
||||
self.last_write_time = 0
|
||||
self._data = []
|
||||
self._last_write_time = 0
|
||||
|
||||
|
||||
def from_yaml(self,stream):
|
||||
def _read_from_yaml(self,stream):
|
||||
"""
|
||||
Fill database from YAML, do not maintain old data
|
||||
Translate the spec portions from node-dict form to spec from
|
||||
Translate the spec portions from node-dict form to spec form
|
||||
"""
|
||||
try:
|
||||
file = yaml.load(stream)
|
||||
except MarkedYAMLError, e:
|
||||
raise SpackYAMLError("error parsing YAML database:", str(e))
|
||||
|
||||
if file==None:
|
||||
if file is None:
|
||||
return
|
||||
|
||||
data = {}
|
||||
@ -104,51 +108,34 @@ def from_yaml(self,stream):
|
||||
for idx in sph['deps']:
|
||||
sph['spec'].dependencies[data[idx]['spec'].name] = data[idx]['spec']
|
||||
|
||||
self.data = data.values()
|
||||
self._data = data.values()
|
||||
|
||||
|
||||
def read_database(self):
|
||||
"""
|
||||
Re-read Database from the data in the set location
|
||||
If the cache is fresh, return immediately.
|
||||
Implemented with mkdir locking for the database file.
|
||||
"""
|
||||
if not self.is_dirty():
|
||||
return
|
||||
|
||||
lock=0
|
||||
while lock==0:
|
||||
try:
|
||||
os.mkdir(self.lock_path)
|
||||
lock=1
|
||||
except OSError as err:
|
||||
pass
|
||||
|
||||
#The try statement ensures that a failure won't leave the
|
||||
#database locked to other processes.
|
||||
try:
|
||||
if os.path.isfile(self.file_path):
|
||||
with open(self.file_path,'r') as f:
|
||||
self.from_yaml(f)
|
||||
else:
|
||||
if os.path.isfile(self._file_path):
|
||||
with open(self._file_path,'r') as f:
|
||||
self._read_from_yaml(f)
|
||||
else:
|
||||
#The file doesn't exist, construct empty data.
|
||||
self.data = []
|
||||
except:
|
||||
os.rmdir(self.lock_path)
|
||||
raise
|
||||
|
||||
os.rmdir(self.lock_path)
|
||||
self._data = []
|
||||
|
||||
|
||||
def write_database_to_yaml(self,stream):
|
||||
def _write_database_to_yaml(self,stream):
|
||||
"""
|
||||
Replace each spec with its dict-node form
|
||||
Then stream all data to YAML
|
||||
"""
|
||||
node_list = []
|
||||
spec_list = [sph['spec'] for sph in self.data]
|
||||
spec_list = [sph['spec'] for sph in self._data]
|
||||
|
||||
for sph in self.data:
|
||||
for sph in self._data:
|
||||
node = {}
|
||||
deps = []
|
||||
for name,spec in sph['spec'].dependencies.items():
|
||||
@ -167,46 +154,23 @@ def write_database_to_yaml(self,stream):
|
||||
def write(self):
|
||||
"""
|
||||
Write the database to the standard location
|
||||
Implements mkdir locking for the database file
|
||||
Everywhere that the database is written it is read
|
||||
within the same lock, so there is no need to refresh
|
||||
the database within write()
|
||||
"""
|
||||
lock=0
|
||||
while lock==0:
|
||||
try:
|
||||
os.mkdir(self.lock_path)
|
||||
lock=1
|
||||
except OSError as err:
|
||||
pass
|
||||
|
||||
#The try statement ensures that a failure won't leave the
|
||||
#database locked to other processes.
|
||||
try:
|
||||
with open(self.file_path,'w') as f:
|
||||
self.last_write_time = int(time.time())
|
||||
self.write_database_to_yaml(f)
|
||||
except:
|
||||
os.rmdir(self.lock_path)
|
||||
raise
|
||||
|
||||
os.rmdir(self.lock_path)
|
||||
|
||||
|
||||
def get_index_of(self, spec):
|
||||
"""
|
||||
Returns the index of a spec in the database
|
||||
If unable to find the spec it returns -1
|
||||
"""
|
||||
for index, sph in enumerate(self.data):
|
||||
if sph['spec'] == spec:
|
||||
return index
|
||||
return -1
|
||||
|
||||
temp_name = os.getpid() + socket.getfqdn() + ".temp"
|
||||
temp_file = path.join(self._root,temp_name)
|
||||
with open(self.temp_path,'w') as f:
|
||||
self._last_write_time = int(time.time())
|
||||
self._write_database_to_yaml(f)
|
||||
os.rename(temp_name,self._file_path)
|
||||
|
||||
def is_dirty(self):
|
||||
"""
|
||||
Returns true iff the database file exists
|
||||
and was most recently written to by another spack instance.
|
||||
"""
|
||||
return (os.path.isfile(self.file_path) and (os.path.getmtime(self.file_path) > self.last_write_time))
|
||||
return (os.path.isfile(self._file_path) and (os.path.getmtime(self._file_path) > self._last_write_time))
|
||||
|
||||
|
||||
# @_autospec
|
||||
@ -215,16 +179,15 @@ def add(self, spec, path):
|
||||
Add the specified entry as a dict
|
||||
Write the database back to memory
|
||||
"""
|
||||
self.read_database()
|
||||
|
||||
sph = {}
|
||||
sph['spec']=spec
|
||||
sph['path']=path
|
||||
sph['hash']=spec.dag_hash()
|
||||
|
||||
self.data.append(sph)
|
||||
|
||||
self.write()
|
||||
with Write_Lock_Instance(self.lock,60):
|
||||
self.read_database()
|
||||
self._data.append(sph)
|
||||
self.write()
|
||||
|
||||
|
||||
@_autospec
|
||||
@ -234,13 +197,15 @@ def remove(self, spec):
|
||||
Searches for and removes the specified spec
|
||||
Writes the database back to memory
|
||||
"""
|
||||
self.read_database()
|
||||
with Write_Lock_Instance(self.lock,60):
|
||||
self.read_database()
|
||||
|
||||
for sp in self.data:
|
||||
if sp['hash'] == spec.dag_hash() and sp['spec'] == spec:
|
||||
self.data.remove(sp)
|
||||
for sp in self._data:
|
||||
#Not sure the hash comparison is necessary
|
||||
if sp['hash'] == spec.dag_hash() and sp['spec'] == spec:
|
||||
self._data.remove(sp)
|
||||
|
||||
self.write()
|
||||
self.write()
|
||||
|
||||
|
||||
@_autospec
|
||||
@ -272,10 +237,11 @@ def installed_package_specs(self):
|
||||
Read installed package names from the database
|
||||
and return their specs
|
||||
"""
|
||||
self.read_database()
|
||||
with Read_Lock_Instance(self.lock,60):
|
||||
self.read_database()
|
||||
|
||||
installed = []
|
||||
for sph in self.data:
|
||||
for sph in self._data:
|
||||
installed.append(sph['spec'])
|
||||
return installed
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user