Installer: queue only link/run deps and requeue with build deps as needed

Refactors BuildTask into separate classes BuildTask and InstallTask
Queues all packages as InstallTask, with link/run deps only
If an InstallTask fails to install from binary, a BuildTask is generated
The BuildTask is queued with dependencies on the new InstallTasks for its
build deps and their link/run dependencies.
The Tasks telescope open to include all build deps of build deps ad-hoc
This commit is contained in:
Gregory Becker 2024-08-28 15:13:54 -07:00
parent 2cd773aea4
commit c74d6117e5
No known key found for this signature in database
GPG Key ID: 2362541F6D14ED84
4 changed files with 169 additions and 118 deletions

View File

@ -118,6 +118,11 @@ class ExecuteResult(enum.Enum):
FAILED = enum.auto()
# Task is missing build spec and will be requeued
MISSING_BUILD_SPEC = enum.auto()
# Task is queued to install from binary but no binary found
MISSING_BINARY = enum.auto()
requeue_results = [ExecuteResult.MISSING_BUILD_SPEC, ExecuteResult.MISSING_BINARY]
class InstallAction(enum.Enum):
@ -806,16 +811,7 @@ def get_depflags(self, pkg: "spack.package_base.PackageBase") -> int:
depflag = dt.LINK | dt.RUN
include_build_deps = self.install_args.get("include_build_deps")
if self.pkg_id == package_id(pkg.spec):
cache_only = self.install_args.get("package_cache_only")
else:
cache_only = self.install_args.get("dependencies_cache_only")
# Include build dependencies if pkg is going to be built from sources, or
# if build deps are explicitly requested.
if include_build_deps or not (
cache_only or pkg.spec.installed and pkg.spec.dag_hash() not in self.overwrite
):
if include_build_deps:
depflag |= dt.BUILD
if self.run_tests(pkg):
depflag |= dt.TEST
@ -1143,7 +1139,6 @@ def execute(self, install_status):
"""
install_args = self.request.install_args
tests = install_args.get("tests")
unsigned = install_args.get("unsigned")
pkg, pkg_id = self.pkg, self.pkg_id
@ -1151,17 +1146,6 @@ def execute(self, install_status):
self.start = self.start or time.time()
self.status = BuildStatus.INSTALLING
# Use the binary cache if requested
if self.use_cache:
if _install_from_cache(pkg, self.explicit, unsigned):
return ExecuteResult.SUCCESS
elif self.cache_only:
raise spack.error.InstallError(
"No binary found when cache-only was specified", pkg=pkg
)
else:
tty.msg(f"No binary for {pkg_id} found: installing from source")
pkg.run_tests = tests is True or tests and pkg.name in tests
# hook that allows tests to inspect the Package before installation
@ -1194,6 +1178,70 @@ def execute(self, install_status):
return ExecuteResult.SUCCESS
class InstallTask(Task):
"""Class for representing a build task for a package."""
def execute(self, install_status):
"""
Perform the installation of the requested spec and/or dependency
represented by the build task.
"""
# no-op and requeue to build if not allowed to use cache
# this
if not self.use_cache:
return ExecuteResult.MISSING_BINARY
install_args = self.request.install_args
unsigned = install_args.get("unsigned")
pkg, pkg_id = self.pkg, self.pkg_id
tty.msg(install_msg(pkg_id, self.pid, install_status))
self.start = self.start or time.time()
self.status = STATUS_INSTALLING
try:
if _install_from_cache(pkg, self.explicit, unsigned):
if self.compiler:
_add_compiler_package_to_config(pkg)
return ExecuteResult.SUCCESS
elif self.cache_only:
raise InstallError("No binary found when cache-only was specified", pkg=pkg)
else:
tty.msg(f"No binary for {pkg_id} found: installing from source")
return ExecuteResult.MISSING_BINARY
except binary_distribution.NoChecksumException as exc:
if self.cache_only:
raise
tty.error(
f"Failed to install {self.pkg.name} from binary cache due "
f"to {str(exc)}: Requeueing to install from source."
)
return ExecuteResult.MISSING_BINARY
def build_task(self, installed):
build_task = BuildTask(
pkg=self.pkg,
request=self.request,
compiler=self.compiler,
start=0,
attempts=self.attempts,
status=STATUS_ADDED,
installed=installed,
)
# Fixup dependents in case it was changed by `add_dependent`
# This would be the case of a `build_spec` for a spliced spec
build_task.dependents = self.dependents
# Same for dependencies
build_task.dependencies = self.dependencies
build_task.uninstalled_deps = self.uninstalled_deps - installed
return build_task
class RewireTask(Task):
"""Class for representing a rewire task for a package."""
@ -1377,8 +1425,9 @@ def _add_init_task(
request: the associated install request
all_deps: dictionary of all dependencies and associated dependents
"""
cls = RewireTask if pkg.spec.spliced else BuildTask
task = cls(pkg, request=request, status=BuildStatus.QUEUED, installed=self.installed)
cls = RewireTask if pkg.spec.spliced else InstallTask
task: Task = cls(pkg, request=request, status=BuildStatus.QUEUED, installed=self.installed)
for dep_id in task.dependencies:
all_deps[dep_id].add(package_id(pkg.spec))
@ -1671,7 +1720,7 @@ def _requeue_with_build_spec_tasks(self, task):
"""Requeue the task and its missing build spec dependencies"""
# Full install of the build_spec is necessary because it didn't already exist somewhere
spec = task.pkg.spec
for dep in spec.build_spec.traverse():
for dep in spec.build_spec.traverse(deptype=task.request.get_depflags(task.pkg)):
dep_pkg = dep.package
dep_id = package_id(dep)
@ -1694,6 +1743,42 @@ def _requeue_with_build_spec_tasks(self, task):
spec_task.add_dependency(build_pkg_id)
self._push_task(spec_task)
def _requeue_as_build_task(self, task):
# TODO: handle the compile bootstrapping stuff?
spec = task.pkg.spec
build_dep_ids = []
for builddep in spec.dependencies(deptype=dt.BUILD):
# track which package ids are the direct build deps
build_dep_ids.append(package_id(builddep))
for dep in builddep.traverse(deptype=task.request.get_depflags(task.pkg)):
dep_pkg = dep.package
dep_id = package_id(dep)
if dep_id not in self.build_tasks and dep_id not in self.installed:
self._add_init_task(dep_pkg, task.request, False, self.all_dependencies)
# Clear any persistent failure markings _unless_ they
# are associated with another process in this parallel build
spack.store.STORE.failure_tracker.clear(dep, force=False)
# Remove InstallTask
self._remove_task(task.pkg_id)
# New task to build this spec from source
build_task = task.build_task(self.installed)
build_task_id = package_id(spec)
# Attach dependency relationships between spec and build deps
for build_dep_id in build_dep_ids:
if build_dep_id not in self.installed:
build_dep_task = self.build_tasks[build_dep_id]
build_dep_task.add_dependent(build_task_id)
build_task.add_dependency(build_dep_id)
# Add new Task -- this removes the old task as well
self._push_task(build_task)
def _add_tasks(self, request: BuildRequest, all_deps):
"""Add tasks to the priority queue for the given build request.
@ -1758,8 +1843,11 @@ def _install_task(self, task: Task, install_status: InstallStatus) -> None:
rc = task.execute(install_status)
if rc == ExecuteResult.MISSING_BUILD_SPEC:
self._requeue_with_build_spec_tasks(task)
elif rc == ExecuteResult.MISSING_BINARY:
self._requeue_as_build_task(task)
else: # if rc == ExecuteResult.SUCCESS or rc == ExecuteResult.FAILED
self._update_installed(task)
return rc # Used by reporters to skip requeued tasks
def _next_is_pri0(self) -> bool:
"""
@ -2176,20 +2264,6 @@ def install(self) -> None:
)
raise
except binary_distribution.NoChecksumException as exc:
if task.cache_only:
raise
# Checking hash on downloaded binary failed.
tty.error(
f"Failed to install {pkg.name} from binary cache due "
f"to {str(exc)}: Requeueing to install from source."
)
# this overrides a full method, which is ugly.
task.use_cache = False # type: ignore[misc]
self._requeue_task(task, install_status)
continue
except (Exception, SystemExit) as exc:
self._update_failed(task, True, exc)
@ -2225,6 +2299,11 @@ def install(self) -> None:
# Perform basic task cleanup for the installed spec to
# include downgrading the write to a read lock
if pkg.spec.installed:
# Do not clean up this was an overwrite that wasn't completed
overwrite = spec.dag_hash() in task.request.overwrite
rec, _ = self._check_db(pkg.spec)
incomplete = task.request.overwrite_time > rec.installation_time
if not (overwrite and incomplete):
self._cleanup_task(pkg)
# Cleanup, which includes releasing all of the read locks
@ -2545,7 +2624,11 @@ def install(self):
"""
try:
with fs.replace_directory_transaction(self.task.pkg.prefix):
self.installer._install_task(self.task, self.install_status)
rc = self.installer._install_task(self.task, self.install_status)
if rc in requeue_results:
raise Requeue # raise to trigger transactional replacement of directory
except Requeue:
pass # this job is requeuing, not failing
except fs.CouldNotRestoreDirectoryBackup as e:
self.database.remove(self.task.pkg.spec)
tty.error(
@ -2558,6 +2641,24 @@ def install(self):
raise e.inner_exception
class Requeue(Exception):
"""Raised when we need an error to indicate a requeueing situation.
While this is raised and excepted, it does not represent an Error."""
class InstallError(spack.error.SpackError):
"""Raised when something goes wrong during install or uninstall.
The error can be annotated with a ``pkg`` attribute to allow the
caller to get the package for which the exception was raised.
"""
def __init__(self, message, long_msg=None, pkg=None):
super().__init__(message, long_msg)
self.pkg = pkg
class BadInstallPhase(spack.error.InstallError):
"""Raised for an install phase option is not allowed for a package."""

View File

@ -101,17 +101,26 @@ def wrapper(instance, *args, **kwargs):
# installed explicitly will also be installed as a
# dependency of another spec. In this case append to both
# spec reports.
added = []
for current_spec in llnl.util.lang.dedupe([pkg.spec.root, pkg.spec]):
name = name_fmt.format(current_spec.name, current_spec.dag_hash(length=7))
try:
item = next((x for x in self.specs if x["name"] == name))
item["packages"].append(package)
added.append(item)
except StopIteration:
pass
start_time = time.time()
try:
value = wrapped_fn(instance, *args, **kwargs)
# If we are requeuing the task, it neither succeeded nor failed
# remove the package so we don't count it (yet) in either category
if value in spack.installer.requeue_results:
for item in added:
item["packages"].remove(package)
package["stdout"] = self.fetch_log(pkg)
package["installed_from_binary_cache"] = pkg.installed_from_binary_cache
self.on_success(pkg, kwargs, package)

View File

@ -56,33 +56,14 @@ def test_build_request_strings(install_mockery):
@pytest.mark.parametrize(
"package_cache_only,dependencies_cache_only,package_deptypes,dependencies_deptypes",
[
(False, False, dt.BUILD | dt.LINK | dt.RUN, dt.BUILD | dt.LINK | dt.RUN),
(True, False, dt.LINK | dt.RUN, dt.BUILD | dt.LINK | dt.RUN),
(False, True, dt.BUILD | dt.LINK | dt.RUN, dt.LINK | dt.RUN),
(True, True, dt.LINK | dt.RUN, dt.LINK | dt.RUN),
],
"include_build_deps,deptypes", [(True, dt.BUILD | dt.LINK | dt.RUN), (False, dt.LINK | dt.RUN)]
)
def test_build_request_deptypes(
install_mockery,
package_cache_only,
dependencies_cache_only,
package_deptypes,
dependencies_deptypes,
):
def test_build_request_deptypes(install_mockery, include_build_deps, deptypes):
s = spack.concretize.concretize_one("dependent-install")
build_request = inst.BuildRequest(
s.package,
{
"package_cache_only": package_cache_only,
"dependencies_cache_only": dependencies_cache_only,
},
)
build_request = inst.BuildRequest(s.package, {"include_build_deps": include_build_deps})
actual_package_deptypes = build_request.get_depflags(s.package)
actual_dependency_deptypes = build_request.get_depflags(s["dependency-install"].package)
package_deptypes = build_request.get_depflags(s.package)
dependency_deptypes = build_request.get_depflags(s["dependency-install"].package)
assert actual_package_deptypes == package_deptypes
assert actual_dependency_deptypes == dependencies_deptypes
assert package_deptypes == dependency_deptypes == deptypes

View File

@ -77,6 +77,13 @@ def create_build_task(
return inst.BuildTask(pkg, request=request, status=inst.BuildStatus.QUEUED)
def create_install_task(
pkg: spack.package_base.PackageBase, install_args: Optional[dict] = None
) -> inst.BuildTask:
request = inst.BuildRequest(pkg, {} if install_args is None else install_args)
return inst.InstallTask(pkg, request, False, 0, 0, inst.STATUS_ADDED, set())
def create_installer(
specs: Union[List[str], List[spack.spec.Spec]], install_args: Optional[dict] = None
) -> inst.PackageInstaller:
@ -221,54 +228,6 @@ def test_installer_str(install_mockery):
assert "failed (0)" in istr
def test_installer_prune_built_build_deps(install_mockery, monkeypatch, tmpdir):
r"""
Ensure that build dependencies of installed deps are pruned
from installer package queues.
(a)
/ \
/ \
(b) (c) <--- is installed already so we should
\ / | \ prune (f) from this install since
\ / | \ it is *only* needed to build (b)
(d) (e) (f)
Thus since (c) is already installed our build_pq dag should
only include four packages. [(a), (b), (c), (d), (e)]
"""
@property
def _mock_installed(self):
return self.name == "pkg-c"
# Mock the installed property to say that (b) is installed
monkeypatch.setattr(spack.spec.Spec, "installed", _mock_installed)
# Create mock repository with packages (a), (b), (c), (d), and (e)
builder = spack.repo.MockRepositoryBuilder(tmpdir.mkdir("mock-repo"))
builder.add_package("pkg-a", dependencies=[("pkg-b", "build", None), ("pkg-c", "build", None)])
builder.add_package("pkg-b", dependencies=[("pkg-d", "build", None)])
builder.add_package(
"pkg-c",
dependencies=[("pkg-d", "build", None), ("pkg-e", "all", None), ("pkg-f", "build", None)],
)
builder.add_package("pkg-d")
builder.add_package("pkg-e")
builder.add_package("pkg-f")
with spack.repo.use_repositories(builder.root):
installer = create_installer(["pkg-a"])
installer._init_queue()
# Assert that (c) is not in the build_pq
result = {task.pkg_id[:5] for _, task in installer.build_pq}
expected = {"pkg-a", "pkg-b", "pkg-c", "pkg-d", "pkg-e"}
assert result == expected
def test_check_before_phase_error(install_mockery):
s = spack.concretize.concretize_one("trivial-install-test-package")
s.package.stop_before_phase = "beforephase"
@ -605,7 +564,7 @@ def test_check_deps_status_external(install_mockery, monkeypatch):
monkeypatch.setattr(spack.spec.Spec, "external", True)
installer._check_deps_status(request)
for dep in request.spec.traverse(root=False):
for dep in request.spec.traverse(root=False, deptype=request.get_depflags(request.spec)):
assert inst.package_id(dep) in installer.installed
@ -617,7 +576,7 @@ def test_check_deps_status_upstream(install_mockery, monkeypatch):
monkeypatch.setattr(spack.spec.Spec, "installed_upstream", True)
installer._check_deps_status(request)
for dep in request.spec.traverse(root=False):
for dep in request.spec.traverse(root=False, deptype=request.get_depflags(request.spec)):
assert inst.package_id(dep) in installer.installed
@ -673,7 +632,8 @@ def test_install_spliced_build_spec_installed(install_mockery, capfd, mock_fetch
installer = create_installer([out], {"verbose": True, "fail_fast": True})
installer._init_queue()
for _, task in installer.build_pq:
assert isinstance(task, inst.RewireTask if task.pkg.spec.spliced else inst.BuildTask)
assert isinstance(task, inst.RewireTask if task.pkg.spec.spliced else inst.InstallTask)
installer.install()
for node in out.traverse():
assert node.installed
@ -724,7 +684,7 @@ def test_install_splice_root_from_binary(
def test_install_task_use_cache(install_mockery, monkeypatch):
installer = create_installer(["trivial-install-test-package"], {})
request = installer.build_requests[0]
task = create_build_task(request.pkg)
task = create_install_task(request.pkg)
monkeypatch.setattr(inst, "_install_from_cache", _true)
installer._install_task(task, None)