Add typehints for directory_layout / Spec.prefix (#48652)

This commit is contained in:
Harmen Stoppels 2025-02-24 10:47:07 +01:00 committed by GitHub
parent 2f9ad5f34d
commit 974abc8067
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 72 additions and 69 deletions

View File

@ -25,7 +25,7 @@
} }
def _check_concrete(spec): def _check_concrete(spec: "spack.spec.Spec") -> None:
"""If the spec is not concrete, raise a ValueError""" """If the spec is not concrete, raise a ValueError"""
if not spec.concrete: if not spec.concrete:
raise ValueError("Specs passed to a DirectoryLayout must be concrete!") raise ValueError("Specs passed to a DirectoryLayout must be concrete!")
@ -51,7 +51,7 @@ def specs_from_metadata_dirs(root: str) -> List["spack.spec.Spec"]:
spec = _get_spec(prefix) spec = _get_spec(prefix)
if spec: if spec:
spec.prefix = prefix spec.set_prefix(prefix)
specs.append(spec) specs.append(spec)
continue continue
@ -84,7 +84,7 @@ class DirectoryLayout:
def __init__( def __init__(
self, self,
root, root: str,
*, *,
projections: Optional[Dict[str, str]] = None, projections: Optional[Dict[str, str]] = None,
hash_length: Optional[int] = None, hash_length: Optional[int] = None,
@ -120,17 +120,17 @@ def __init__(
self.manifest_file_name = "install_manifest.json" self.manifest_file_name = "install_manifest.json"
@property @property
def hidden_file_regexes(self): def hidden_file_regexes(self) -> Tuple[str]:
return ("^{0}$".format(re.escape(self.metadata_dir)),) return ("^{0}$".format(re.escape(self.metadata_dir)),)
def relative_path_for_spec(self, spec): def relative_path_for_spec(self, spec: "spack.spec.Spec") -> str:
_check_concrete(spec) _check_concrete(spec)
projection = spack.projections.get_projection(self.projections, spec) projection = spack.projections.get_projection(self.projections, spec)
path = spec.format_path(projection) path = spec.format_path(projection)
return str(Path(path)) return str(Path(path))
def write_spec(self, spec, path): def write_spec(self, spec: "spack.spec.Spec", path: str) -> None:
"""Write a spec out to a file.""" """Write a spec out to a file."""
_check_concrete(spec) _check_concrete(spec)
with open(path, "w", encoding="utf-8") as f: with open(path, "w", encoding="utf-8") as f:
@ -138,7 +138,7 @@ def write_spec(self, spec, path):
# the full provenance, so it's availabe if we want it later # the full provenance, so it's availabe if we want it later
spec.to_json(f, hash=ht.dag_hash) spec.to_json(f, hash=ht.dag_hash)
def write_host_environment(self, spec): def write_host_environment(self, spec: "spack.spec.Spec") -> None:
"""The host environment is a json file with os, kernel, and spack """The host environment is a json file with os, kernel, and spack
versioning. We use it in the case that an analysis later needs to versioning. We use it in the case that an analysis later needs to
easily access this information. easily access this information.
@ -148,7 +148,7 @@ def write_host_environment(self, spec):
with open(env_file, "w", encoding="utf-8") as fd: with open(env_file, "w", encoding="utf-8") as fd:
sjson.dump(environ, fd) sjson.dump(environ, fd)
def read_spec(self, path): def read_spec(self, path: str) -> "spack.spec.Spec":
"""Read the contents of a file and parse them as a spec""" """Read the contents of a file and parse them as a spec"""
try: try:
with open(path, encoding="utf-8") as f: with open(path, encoding="utf-8") as f:
@ -159,26 +159,28 @@ def read_spec(self, path):
# Too late for conversion; spec_file_path() already called. # Too late for conversion; spec_file_path() already called.
spec = spack.spec.Spec.from_yaml(f) spec = spack.spec.Spec.from_yaml(f)
else: else:
raise SpecReadError( raise SpecReadError(f"Did not recognize spec file extension: {extension}")
"Did not recognize spec file extension:" " {0}".format(extension)
)
except Exception as e: except Exception as e:
if spack.config.get("config:debug"): if spack.config.get("config:debug"):
raise raise
raise SpecReadError("Unable to read file: %s" % path, "Cause: " + str(e)) raise SpecReadError(f"Unable to read file: {path}", f"Cause: {e}")
# Specs read from actual installations are always concrete # Specs read from actual installations are always concrete
spec._mark_concrete() spec._mark_concrete()
return spec return spec
def spec_file_path(self, spec): def spec_file_path(self, spec: "spack.spec.Spec") -> str:
"""Gets full path to spec file""" """Gets full path to spec file"""
_check_concrete(spec) _check_concrete(spec)
yaml_path = os.path.join(self.metadata_path(spec), self._spec_file_name_yaml) yaml_path = os.path.join(self.metadata_path(spec), self._spec_file_name_yaml)
json_path = os.path.join(self.metadata_path(spec), self.spec_file_name) json_path = os.path.join(self.metadata_path(spec), self.spec_file_name)
return yaml_path if os.path.exists(yaml_path) else json_path return yaml_path if os.path.exists(yaml_path) else json_path
def deprecated_file_path(self, deprecated_spec, deprecator_spec=None): def deprecated_file_path(
self,
deprecated_spec: "spack.spec.Spec",
deprecator_spec: Optional["spack.spec.Spec"] = None,
) -> str:
"""Gets full path to spec file for deprecated spec """Gets full path to spec file for deprecated spec
If the deprecator_spec is provided, use that. Otherwise, assume If the deprecator_spec is provided, use that. Otherwise, assume
@ -212,16 +214,16 @@ def deprecated_file_path(self, deprecated_spec, deprecator_spec=None):
return yaml_path if os.path.exists(yaml_path) else json_path return yaml_path if os.path.exists(yaml_path) else json_path
def metadata_path(self, spec): def metadata_path(self, spec: "spack.spec.Spec") -> str:
return os.path.join(spec.prefix, self.metadata_dir) return os.path.join(spec.prefix, self.metadata_dir)
def env_metadata_path(self, spec): def env_metadata_path(self, spec: "spack.spec.Spec") -> str:
return os.path.join(self.metadata_path(spec), "install_environment.json") return os.path.join(self.metadata_path(spec), "install_environment.json")
def build_packages_path(self, spec): def build_packages_path(self, spec: "spack.spec.Spec") -> str:
return os.path.join(self.metadata_path(spec), self.packages_dir) return os.path.join(self.metadata_path(spec), self.packages_dir)
def create_install_directory(self, spec): def create_install_directory(self, spec: "spack.spec.Spec") -> None:
_check_concrete(spec) _check_concrete(spec)
# Create install directory with properly configured permissions # Create install directory with properly configured permissions
@ -239,7 +241,7 @@ def create_install_directory(self, spec):
self.write_spec(spec, self.spec_file_path(spec)) self.write_spec(spec, self.spec_file_path(spec))
def ensure_installed(self, spec): def ensure_installed(self, spec: "spack.spec.Spec") -> None:
""" """
Throws InconsistentInstallDirectoryError if: Throws InconsistentInstallDirectoryError if:
1. spec prefix does not exist 1. spec prefix does not exist
@ -266,7 +268,7 @@ def ensure_installed(self, spec):
"Spec file in %s does not match hash!" % spec_file_path "Spec file in %s does not match hash!" % spec_file_path
) )
def path_for_spec(self, spec): def path_for_spec(self, spec: "spack.spec.Spec") -> str:
"""Return absolute path from the root to a directory for the spec.""" """Return absolute path from the root to a directory for the spec."""
_check_concrete(spec) _check_concrete(spec)
@ -277,23 +279,13 @@ def path_for_spec(self, spec):
assert not path.startswith(self.root) assert not path.startswith(self.root)
return os.path.join(self.root, path) return os.path.join(self.root, path)
def remove_install_directory(self, spec, deprecated=False): def remove_install_directory(self, spec: "spack.spec.Spec", deprecated: bool = False) -> None:
"""Removes a prefix and any empty parent directories from the root. """Removes a prefix and any empty parent directories from the root.
Raised RemoveFailedError if something goes wrong. Raised RemoveFailedError if something goes wrong.
""" """
path = self.path_for_spec(spec) path = self.path_for_spec(spec)
assert path.startswith(self.root) assert path.startswith(self.root)
# Windows readonly files cannot be removed by Python
# directly, change permissions before attempting to remove
if sys.platform == "win32":
kwargs = {
"ignore_errors": False,
"onerror": fs.readonly_file_handler(ignore_errors=False),
}
else:
kwargs = {} # the default value for ignore_errors is false
if deprecated: if deprecated:
if os.path.exists(path): if os.path.exists(path):
try: try:
@ -304,7 +296,16 @@ def remove_install_directory(self, spec, deprecated=False):
raise RemoveFailedError(spec, path, e) from e raise RemoveFailedError(spec, path, e) from e
elif os.path.exists(path): elif os.path.exists(path):
try: try:
shutil.rmtree(path, **kwargs) if sys.platform == "win32":
# Windows readonly files cannot be removed by Python
# directly, change permissions before attempting to remove
shutil.rmtree(
path,
ignore_errors=False,
onerror=fs.readonly_file_handler(ignore_errors=False),
)
else:
shutil.rmtree(path)
except OSError as e: except OSError as e:
raise RemoveFailedError(spec, path, e) from e raise RemoveFailedError(spec, path, e) from e

View File

@ -10,7 +10,7 @@
import stat import stat
import sys import sys
import tempfile import tempfile
from typing import Callable, Dict, Optional from typing import Callable, Dict, List, Optional
from typing_extensions import Literal from typing_extensions import Literal
@ -78,7 +78,7 @@ def view_copy(
# Order of this dict is somewhat irrelevant # Order of this dict is somewhat irrelevant
prefix_to_projection = { prefix_to_projection = {
s.prefix: view.get_projection_for_spec(s) str(s.prefix): view.get_projection_for_spec(s)
for s in spec.traverse(root=True, order="breadth") for s in spec.traverse(root=True, order="breadth")
if not s.external if not s.external
} }
@ -185,7 +185,7 @@ def __init__(
def link(self, src: str, dst: str, spec: Optional[spack.spec.Spec] = None) -> None: def link(self, src: str, dst: str, spec: Optional[spack.spec.Spec] = None) -> None:
self._link(src, dst, self, spec) self._link(src, dst, self, spec)
def add_specs(self, *specs, **kwargs): def add_specs(self, *specs: spack.spec.Spec, **kwargs) -> None:
""" """
Add given specs to view. Add given specs to view.
@ -200,19 +200,19 @@ def add_specs(self, *specs, **kwargs):
""" """
raise NotImplementedError raise NotImplementedError
def add_standalone(self, spec): def add_standalone(self, spec: spack.spec.Spec) -> bool:
""" """
Add (link) a standalone package into this view. Add (link) a standalone package into this view.
""" """
raise NotImplementedError raise NotImplementedError
def check_added(self, spec): def check_added(self, spec: spack.spec.Spec) -> bool:
""" """
Check if the given concrete spec is active in this view. Check if the given concrete spec is active in this view.
""" """
raise NotImplementedError raise NotImplementedError
def remove_specs(self, *specs, **kwargs): def remove_specs(self, *specs: spack.spec.Spec, **kwargs) -> None:
""" """
Removes given specs from view. Removes given specs from view.
@ -231,25 +231,25 @@ def remove_specs(self, *specs, **kwargs):
""" """
raise NotImplementedError raise NotImplementedError
def remove_standalone(self, spec): def remove_standalone(self, spec: spack.spec.Spec) -> None:
""" """
Remove (unlink) a standalone package from this view. Remove (unlink) a standalone package from this view.
""" """
raise NotImplementedError raise NotImplementedError
def get_projection_for_spec(self, spec): def get_projection_for_spec(self, spec: spack.spec.Spec) -> str:
""" """
Get the projection in this view for a spec. Get the projection in this view for a spec.
""" """
raise NotImplementedError raise NotImplementedError
def get_all_specs(self): def get_all_specs(self) -> List[spack.spec.Spec]:
""" """
Get all specs currently active in this view. Get all specs currently active in this view.
""" """
raise NotImplementedError raise NotImplementedError
def get_spec(self, spec): def get_spec(self, spec: spack.spec.Spec) -> Optional[spack.spec.Spec]:
""" """
Return the actual spec linked in this view (i.e. do not look it up Return the actual spec linked in this view (i.e. do not look it up
in the database by name). in the database by name).
@ -263,7 +263,7 @@ def get_spec(self, spec):
""" """
raise NotImplementedError raise NotImplementedError
def print_status(self, *specs, **kwargs): def print_status(self, *specs: spack.spec.Spec, **kwargs) -> None:
""" """
Print a short summary about the given specs, detailing whether.. Print a short summary about the given specs, detailing whether..
* ..they are active in the view. * ..they are active in the view.
@ -694,7 +694,7 @@ def _sanity_check_view_projection(self, specs):
raise ConflictingSpecsError(current_spec, conflicting_spec) raise ConflictingSpecsError(current_spec, conflicting_spec)
seen[metadata_dir] = current_spec seen[metadata_dir] = current_spec
def add_specs(self, *specs: spack.spec.Spec) -> None: def add_specs(self, *specs, **kwargs) -> None:
"""Link a root-to-leaf topologically ordered list of specs into the view.""" """Link a root-to-leaf topologically ordered list of specs into the view."""
assert all((s.concrete for s in specs)) assert all((s.concrete for s in specs))
if len(specs) == 0: if len(specs) == 0:
@ -831,7 +831,7 @@ def get_projection_for_spec(self, spec):
##################### #####################
# utility functions # # utility functions #
##################### #####################
def get_spec_from_file(filename): def get_spec_from_file(filename) -> Optional[spack.spec.Spec]:
try: try:
with open(filename, "r", encoding="utf-8") as f: with open(filename, "r", encoding="utf-8") as f:
return spack.spec.Spec.from_yaml(f) return spack.spec.Spec.from_yaml(f)

View File

@ -2118,20 +2118,20 @@ def cshort_spec(self):
return self.cformat(spec_format) return self.cformat(spec_format)
@property @property
def prefix(self): def prefix(self) -> spack.util.prefix.Prefix:
if not self._concrete: if not self._concrete:
raise spack.error.SpecError("Spec is not concrete: " + str(self)) raise spack.error.SpecError(f"Spec is not concrete: {self}")
if self._prefix is None: if self._prefix is None:
upstream, record = spack.store.STORE.db.query_by_spec_hash(self.dag_hash()) _, record = spack.store.STORE.db.query_by_spec_hash(self.dag_hash())
if record and record.path: if record and record.path:
self.prefix = record.path self.set_prefix(record.path)
else: else:
self.prefix = spack.store.STORE.layout.path_for_spec(self) self.set_prefix(spack.store.STORE.layout.path_for_spec(self))
assert self._prefix is not None
return self._prefix return self._prefix
@prefix.setter def set_prefix(self, value: str) -> None:
def prefix(self, value):
self._prefix = spack.util.prefix.Prefix(llnl.path.convert_to_platform_path(value)) self._prefix = spack.util.prefix.Prefix(llnl.path.convert_to_platform_path(value))
def spec_hash(self, hash): def spec_hash(self, hash):
@ -2737,7 +2737,7 @@ def spec_and_dependency_types(
return spec_builder(spec_dict) return spec_builder(spec_dict)
@staticmethod @staticmethod
def from_dict(data): def from_dict(data) -> "Spec":
"""Construct a spec from JSON/YAML. """Construct a spec from JSON/YAML.
Args: Args:
@ -2760,7 +2760,7 @@ def from_dict(data):
return spec return spec
@staticmethod @staticmethod
def from_yaml(stream): def from_yaml(stream) -> "Spec":
"""Construct a spec from YAML. """Construct a spec from YAML.
Args: Args:
@ -2770,7 +2770,7 @@ def from_yaml(stream):
return Spec.from_dict(data) return Spec.from_dict(data)
@staticmethod @staticmethod
def from_json(stream): def from_json(stream) -> "Spec":
"""Construct a spec from JSON. """Construct a spec from JSON.
Args: Args:
@ -2780,7 +2780,7 @@ def from_json(stream):
data = sjson.load(stream) data = sjson.load(stream)
return Spec.from_dict(data) return Spec.from_dict(data)
except Exception as e: except Exception as e:
raise sjson.SpackJSONError("error parsing JSON spec:", str(e)) from e raise sjson.SpackJSONError("error parsing JSON spec:", e) from e
@staticmethod @staticmethod
def extract_json_from_clearsig(data): def extract_json_from_clearsig(data):

View File

@ -388,7 +388,7 @@ def test_wrapper_variables(
root = spack.concretize.concretize_one("dt-diamond") root = spack.concretize.concretize_one("dt-diamond")
for s in root.traverse(): for s in root.traverse():
s.prefix = "/{0}-prefix/".format(s.name) s.set_prefix(f"/{s.name}-prefix/")
dep_pkg = root["dt-diamond-left"].package dep_pkg = root["dt-diamond-left"].package
dep_lib_paths = ["/test/path/to/ex1.so", "/test/path/to/subdir/ex2.so"] dep_lib_paths = ["/test/path/to/ex1.so", "/test/path/to/subdir/ex2.so"]
@ -396,7 +396,7 @@ def test_wrapper_variables(
dep_libs = LibraryList(dep_lib_paths) dep_libs = LibraryList(dep_lib_paths)
dep2_pkg = root["dt-diamond-right"].package dep2_pkg = root["dt-diamond-right"].package
dep2_pkg.spec.prefix = str(installation_dir_with_headers) dep2_pkg.spec.set_prefix(str(installation_dir_with_headers))
setattr(dep_pkg, "libs", dep_libs) setattr(dep_pkg, "libs", dep_libs)
try: try:

View File

@ -403,8 +403,8 @@ def test_autoreconf_search_path_args_multiple(default_mock_concretization, tmpdi
aclocal_fst = str(tmpdir.mkdir("fst").mkdir("share").mkdir("aclocal")) aclocal_fst = str(tmpdir.mkdir("fst").mkdir("share").mkdir("aclocal"))
aclocal_snd = str(tmpdir.mkdir("snd").mkdir("share").mkdir("aclocal")) aclocal_snd = str(tmpdir.mkdir("snd").mkdir("share").mkdir("aclocal"))
build_dep_one, build_dep_two = spec.dependencies(deptype="build") build_dep_one, build_dep_two = spec.dependencies(deptype="build")
build_dep_one.prefix = str(tmpdir.join("fst")) build_dep_one.set_prefix(str(tmpdir.join("fst")))
build_dep_two.prefix = str(tmpdir.join("snd")) build_dep_two.set_prefix(str(tmpdir.join("snd")))
assert spack.build_systems.autotools._autoreconf_search_path_args(spec) == [ assert spack.build_systems.autotools._autoreconf_search_path_args(spec) == [
"-I", "-I",
aclocal_fst, aclocal_fst,
@ -422,8 +422,8 @@ def test_autoreconf_search_path_args_skip_automake(default_mock_concretization,
aclocal_snd = str(tmpdir.mkdir("snd").mkdir("share").mkdir("aclocal")) aclocal_snd = str(tmpdir.mkdir("snd").mkdir("share").mkdir("aclocal"))
build_dep_one, build_dep_two = spec.dependencies(deptype="build") build_dep_one, build_dep_two = spec.dependencies(deptype="build")
build_dep_one.name = "automake" build_dep_one.name = "automake"
build_dep_one.prefix = str(tmpdir.join("fst")) build_dep_one.set_prefix(str(tmpdir.join("fst")))
build_dep_two.prefix = str(tmpdir.join("snd")) build_dep_two.set_prefix(str(tmpdir.join("snd")))
assert spack.build_systems.autotools._autoreconf_search_path_args(spec) == ["-I", aclocal_snd] assert spack.build_systems.autotools._autoreconf_search_path_args(spec) == ["-I", aclocal_snd]
@ -434,7 +434,7 @@ def test_autoreconf_search_path_args_external_order(default_mock_concretization,
aclocal_snd = str(tmpdir.mkdir("snd").mkdir("share").mkdir("aclocal")) aclocal_snd = str(tmpdir.mkdir("snd").mkdir("share").mkdir("aclocal"))
build_dep_one, build_dep_two = spec.dependencies(deptype="build") build_dep_one, build_dep_two = spec.dependencies(deptype="build")
build_dep_one.external_path = str(tmpdir.join("fst")) build_dep_one.external_path = str(tmpdir.join("fst"))
build_dep_two.prefix = str(tmpdir.join("snd")) build_dep_two.set_prefix(str(tmpdir.join("snd")))
assert spack.build_systems.autotools._autoreconf_search_path_args(spec) == [ assert spack.build_systems.autotools._autoreconf_search_path_args(spec) == [
"-I", "-I",
aclocal_snd, aclocal_snd,
@ -447,8 +447,8 @@ def test_autoreconf_search_path_skip_nonexisting(default_mock_concretization, tm
"""Skip -I flags for non-existing directories""" """Skip -I flags for non-existing directories"""
spec = default_mock_concretization("dttop") spec = default_mock_concretization("dttop")
build_dep_one, build_dep_two = spec.dependencies(deptype="build") build_dep_one, build_dep_two = spec.dependencies(deptype="build")
build_dep_one.prefix = str(tmpdir.join("fst")) build_dep_one.set_prefix(str(tmpdir.join("fst")))
build_dep_two.prefix = str(tmpdir.join("snd")) build_dep_two.set_prefix(str(tmpdir.join("snd")))
assert spack.build_systems.autotools._autoreconf_search_path_args(spec) == [] assert spack.build_systems.autotools._autoreconf_search_path_args(spec) == []

View File

@ -133,7 +133,7 @@ def test_check_prefix_manifest(tmpdir):
spec = spack.spec.Spec("libelf") spec = spack.spec.Spec("libelf")
spec._mark_concrete() spec._mark_concrete()
spec.prefix = prefix spec.set_prefix(prefix)
results = spack.verify.check_spec_manifest(spec) results = spack.verify.check_spec_manifest(spec)
assert results.has_errors() assert results.has_errors()

View File

@ -35,8 +35,8 @@ def test_view_with_spec_not_contributing_files(mock_packages, tmpdir):
a = Spec("pkg-a") a = Spec("pkg-a")
b = Spec("pkg-b") b = Spec("pkg-b")
a.prefix = os.path.join(tmpdir, "a") a.set_prefix(os.path.join(tmpdir, "a"))
b.prefix = os.path.join(tmpdir, "b") b.set_prefix(os.path.join(tmpdir, "b"))
a._mark_concrete() a._mark_concrete()
b._mark_concrete() b._mark_concrete()

View File

@ -68,7 +68,9 @@ def project_env_mods(
*specs: spack.spec.Spec, view, env: environment.EnvironmentModifications *specs: spack.spec.Spec, view, env: environment.EnvironmentModifications
) -> None: ) -> None:
"""Given a list of environment modifications, project paths changes to the view.""" """Given a list of environment modifications, project paths changes to the view."""
prefix_to_prefix = {s.prefix: view.get_projection_for_spec(s) for s in specs if not s.external} prefix_to_prefix = {
str(s.prefix): view.get_projection_for_spec(s) for s in specs if not s.external
}
# Avoid empty regex if all external # Avoid empty regex if all external
if not prefix_to_prefix: if not prefix_to_prefix:
return return