refactors and test fixes

This commit is contained in:
kshea21 2025-02-10 18:11:18 -08:00 committed by Gregory Becker
parent 2da51eaec7
commit 4a153a185b
No known key found for this signature in database
GPG Key ID: 2362541F6D14ED84
4 changed files with 293 additions and 295 deletions

View File

@ -1156,6 +1156,18 @@ def complete(self):
"""
return complete_build_process(self)
def terminate_processes(self):
"""Terminate the active child processes if installation failure/error"""
if self.process.is_alive():
# opportunity for graceful termination
self.process.terminate()
self.process.join(timeout=1)
# if the process didn't gracefully terminate, forcefully kill
if self.process.is_alive():
os.kill(self.process.pid, signal.SIGKILL)
self.process.join()
def _setup_pkg_and_run(
serialized_pkg: "spack.subprocess_context.PackageInstallContext",

View File

@ -399,10 +399,10 @@ def stand_alone_tests(self, kwargs, timeout: Optional[int] = None) -> None:
"""
import spack.build_environment # avoid circular dependency
spack.build_environment.start_build_process(
ph = spack.build_environment.start_build_process(
self.pkg, test_process, kwargs, timeout=timeout
)
spack.build_environment.complete_build_process()
spack.build_environment.ProcessHandle.complete(ph)
def parts(self) -> int:
"""The total number of (checked) test parts."""

View File

@ -73,6 +73,7 @@
#: were added (see https://docs.python.org/2/library/heapq.html).
_counter = itertools.count(0)
_fail_fast_err = "Terminating after first install failure"
class BuildStatus(enum.Enum):
"""Different build (task) states."""
@ -1066,6 +1067,7 @@ def flag_installed(self, installed: List[str]) -> None:
level=2,
)
def _setup_install_dir(self, pkg: "spack.package_base.PackageBase") -> None:
"""
Create and ensure proper access controls for the install directory.
@ -1099,6 +1101,44 @@ def _setup_install_dir(self, pkg: "spack.package_base.PackageBase") -> None:
# Always write host environment - we assume this can change
spack.store.STORE.layout.write_host_environment(pkg.spec)
@property
def install_action(self: "Task") -> InstallAction:
"""
Determine whether the installation should be overwritten (if it already
exists) or skipped (if has been handled by another process).
If the package has not been installed yet, this will indicate that the
installation should proceed as normal (i.e. no need to transactionally
preserve the old prefix).
"""
# If we don't have to overwrite, do a normal install
if self.pkg.spec.dag_hash() not in self.request.overwrite:
return InstallAction.INSTALL
# If it's not installed, do a normal install as well
rec, installed = check_db(self.pkg.spec)
if not installed:
return InstallAction.INSTALL
# Ensure install_tree projections have not changed.
assert rec and self.pkg.prefix == rec.path
# If another process has overwritten this, we shouldn't install at all
if rec.installation_time >= self.request.overwrite_time:
return InstallAction.NONE
# If the install prefix is missing, warn about it, and proceed with
# normal install.
if not os.path.exists(task.pkg.prefix):
tty.debug("Missing installation to overwrite")
return InstallAction.INSTALL
# Otherwise, do an actual overwrite install. We backup the original
# install directory, put the old prefix
# back on failure
return InstallAction.OVERWRITE
@property
def explicit(self) -> bool:
return self.pkg.spec.dag_hash() in self.request.install_args.get("explicit", [])
@ -1142,6 +1182,27 @@ def priority(self):
"""The priority is based on the remaining uninstalled dependencies."""
return len(self.uninstalled_deps)
def check_db(
spec: "spack.spec.Spec"
) -> Tuple[Optional[spack.database.InstallRecord], bool]:
"""Determine if the spec is flagged as installed in the database
Args:
spec: spec whose database install status is being checked
Return:
Tuple of optional database record, and a boolean installed_in_db
that's ``True`` iff the spec is considered installed
"""
try:
rec = spack.store.STORE.db.get_record(spec)
installed_in_db = rec.installed if rec else False
except KeyError:
# KeyError is raised if there is no matching spec in the database
# (versus no matching specs that are installed).
rec = None
installed_in_db = False
return rec, installed_in_db
class BuildTask(Task):
"""Class for representing a build task for a package."""
@ -1163,6 +1224,7 @@ def start(self):
unsigned = install_args.get("unsigned")
pkg, pkg_id = self.pkg, self.pkg_id
self.start_time = self.start_time or time.time()
action = self.install_action
# Use the binary cache to install if requested,
# save result to be handled in BuildTask.complete()
@ -1180,7 +1242,6 @@ def start(self):
# if there's an error result, don't start a new process, and leave
if self.error_result is not None:
print("got to the start error handling !!! !! !!!")
return
# Create stage object now and let it be serialized for the child process. That
@ -1189,8 +1250,10 @@ def start(self):
self._setup_install_dir(pkg)
# Create a child process to do the actual installation.
child_process = overwrite_process if action == InstallAction.OVERWRITE else build_process
self.process_handle = spack.build_environment.start_build_process(
self.pkg, build_process, self.request.install_args
self.pkg, child_process, self.request.install_args
)
# Identify the child process
@ -1455,27 +1518,6 @@ def _add_init_task(
self._push_task(task)
def _check_db(
self, spec: "spack.spec.Spec"
) -> Tuple[Optional[spack.database.InstallRecord], bool]:
"""Determine if the spec is flagged as installed in the database
Args:
spec: spec whose database install status is being checked
Return:
Tuple of optional database record, and a boolean installed_in_db
that's ``True`` if the spec is considered installed
"""
try:
rec = spack.store.STORE.db.get_record(spec)
installed_in_db = rec.installed if rec else False
except KeyError:
# KeyError is raised if there is no matching spec in the database
# (versus no matching specs that are installed).
rec = None
installed_in_db = False
return rec, installed_in_db
def _check_deps_status(self, request: BuildRequest) -> None:
"""Check the install status of the requested package
@ -1509,7 +1551,7 @@ def _check_deps_status(self, request: BuildRequest) -> None:
# Check the database to see if the dependency has been installed
# and flag as such if appropriate
rec, installed_in_db = self._check_db(dep)
rec, installed_in_db = check_db(dep)
if (
rec
and installed_in_db
@ -1547,7 +1589,7 @@ def _prepare_for_install(self, task: Task) -> None:
return
# Determine if the spec is flagged as installed in the database
rec, installed_in_db = self._check_db(task.pkg.spec)
rec, installed_in_db = check_db(task.pkg.spec)
if not installed_in_db:
# Ensure there is no other installed spec with the same prefix dir
@ -2074,59 +2116,8 @@ def _init_queue(self) -> None:
task.add_dependent(dependent_id)
self.all_dependencies = all_dependencies
def _install_action(self, task: Task) -> InstallAction:
"""
Determine whether the installation should be overwritten (if it already
exists) or skipped (if has been handled by another process).
If the package has not been installed yet, this will indicate that the
installation should proceed as normal (i.e. no need to transactionally
preserve the old prefix).
"""
# If we don't have to overwrite, do a normal install
if task.pkg.spec.dag_hash() not in task.request.overwrite:
return InstallAction.INSTALL
# If it's not installed, do a normal install as well
rec, installed = self._check_db(task.pkg.spec)
if not installed:
return InstallAction.INSTALL
# Ensure install_tree projections have not changed.
assert rec and task.pkg.prefix == rec.path
# If another process has overwritten this, we shouldn't install at all
if rec.installation_time >= task.request.overwrite_time:
return InstallAction.NONE
# If the install prefix is missing, warn about it, and proceed with
# normal install.
if not os.path.exists(task.pkg.prefix):
tty.debug("Missing installation to overwrite")
return InstallAction.INSTALL
# Otherwise, do an actual overwrite install. We backup the original
# install directory, put the old prefix
# back on failure
return InstallAction.OVERWRITE
def install(self) -> None:
"""Install the requested package(s) and or associated dependencies."""
self._init_queue()
fail_fast_err = "Terminating after first install failure"
single_requested_spec = len(self.build_requests) == 1
failed_build_requests = []
install_status = InstallStatus(len(self.build_pq))
active_tasks: List[Task] = []
# Only enable the terminal status line when we're in a tty without debug info
# enabled, so that the output does not get cluttered.
term_status = TermStatusLine(
enabled=sys.stdout.isatty() and tty.msg_enabled() and not tty.is_debug()
)
def start_task(task) -> None:
def start_task(self, task: Task, install_status: InstallStatus, term_status: TermStatusLine) -> None:
"""Attempts to start a package installation."""
pkg, pkg_id, spec = task.pkg, task.pkg_id, task.pkg.spec
install_status.next_pkg(pkg)
@ -2150,7 +2141,7 @@ def start_task(task) -> None:
self._update_failed(task)
if self.fail_fast:
task.error_result = spack.error.InstallError(fail_fast_err, pkg=pkg)
task.error_result = spack.error.InstallError(_fail_fast_err, pkg=pkg)
# Attempt to get a write lock. If we can't get the lock then
# another process is likely (un)installing the spec or has
@ -2224,25 +2215,19 @@ def start_task(task) -> None:
# Proceed with the installation since we have an exclusive write
# lock on the package.
install_status.set_term_title(f"Installing {task.pkg.name}")
action = self._install_action(task)
action = task.install_action
if action == InstallAction.INSTALL:
if action in (InstallAction.INSTALL, InstallAction.OVERWRITE):
# Start a child process for a task that's ready to be installed.
task.start()
tty.msg(install_msg(pkg_id, self.pid, install_status))
elif action == InstallAction.OVERWRITE:
# spack.store.STORE.db is not really a Database object, but a small
# wrapper -- silence mypy
OverwriteInstall(
self, spack.store.STORE.db, task, install_status
).install() # type: ignore[arg-type] # noqa: E501
def complete_task(task) -> None:
def complete_task(self, task: Task, install_status: InstallStatus) -> Optional[Tuple]:
"""Attempts to complete a package installation."""
pkg, pkg_id = task.pkg, task.pkg_id
install_args = task.request.install_args
keep_prefix = install_args.get("keep_prefix")
action = self._install_action(task)
action = task.install_action
try:
self._complete_task(task, install_status)
@ -2273,6 +2258,18 @@ def complete_task(task) -> None:
self._requeue_task(task, install_status)
return None
# Overwrite process exception handling
except fs.CouldNotRestoreDirectoryBackup as e:
self.database.remove(task.pkg.spec)
tty.error(
f"Recovery of install dir of {task.pkg.name} failed due to "
f"{e.outer_exception.__class__.__name__}: {str(e.outer_exception)}. "
"The spec is now uninstalled."
)
# Unwrap the actual installation exception.
raise e.inner_exception
except (Exception, SystemExit) as exc:
self._update_failed(task, True, exc)
@ -2290,14 +2287,14 @@ def complete_task(task) -> None:
# Terminate if requested to do so on the first failure.
if self.fail_fast:
raise spack.error.InstallError(
f"{fail_fast_err}: {str(exc)}", pkg=pkg
f"{_fail_fast_err}: {str(exc)}", pkg=pkg
) from exc
# Terminate when a single build request has failed, or summarize errors later.
if task.is_build_request:
if single_requested_spec:
if len(self.build_requests) == 1:
raise
failed_build_requests.append((pkg, pkg_id, str(exc)))
return (pkg, pkg_id, str(exc))
finally:
# Remove the install prefix if anything went wrong during
@ -2310,6 +2307,20 @@ def complete_task(task) -> None:
if pkg.spec.installed:
self._cleanup_task(pkg)
def install(self) -> None:
"""Install the requested package(s) and or associated dependencies."""
self._init_queue()
failed_build_requests = []
install_status = InstallStatus(len(self.build_pq))
active_tasks: List[Task] = []
# Only enable the terminal status line when we're in a tty without debug info
# enabled, so that the output does not get cluttered.
term_status = TermStatusLine(
enabled=sys.stdout.isatty() and tty.msg_enabled() and not tty.is_debug()
)
# While a task is ready or tasks are running
while self._peek_ready_task() or active_tasks:
# While there's space for more active tasks to start
@ -2322,7 +2333,7 @@ def complete_task(task) -> None:
active_tasks.append(task)
try:
# Attempt to start the task's package installation
start_task(task)
self.start_task(task, install_status, term_status)
except BaseException as e:
# Delegating any exception that happens in start_task() to be
# handled in complete_task()
@ -2330,19 +2341,17 @@ def complete_task(task) -> None:
time.sleep(0.1)
# Check if any tasks have completed and add to list
#for task in active_tasks:
# print("what are the tasks",task)
done = [task for task in active_tasks if task.poll()]
# Iterate through the done tasks and complete them
for task in done:
try:
complete_task(task)
failure = self.complete_task(task, install_status)
if failure:
failed_build_requests.append(failure)
except:
# Terminate any active child processes if there's an installation error
for task in active_tasks:
print("terminate active tasks for loop")
if task.process_handle is not None:
print("are we trying to shut down a tangential active process")
task.process_handle.terminate_processes()
raise
finally:
@ -2631,6 +2640,15 @@ def build_process(pkg: "spack.package_base.PackageBase", install_args: dict) ->
with spack.util.path.filter_padding():
return installer.run()
def overwrite_process(pkg: "spack.package_base.PackageBase", install_args: dict) -> bool:
# TODO:I don't know if this comment accurately reflects what's going on anymore
# TODO: think it should move to the error handling
# Try to run the install task overwriting the package prefix.
# If this fails, try to recover the original install prefix. If that fails
# too, mark the spec as uninstalled. This function always the original
# install error if installation fails.
with fs.replace_directory_transaction(pkg.prefix):
return build_process(pkg, install_args)
def deprecate(spec: "spack.spec.Spec", deprecator: "spack.spec.Spec", link_fn) -> None:
"""Deprecate this package in favor of deprecator spec"""
@ -2668,45 +2686,7 @@ def deprecate(spec: "spack.spec.Spec", deprecator: "spack.spec.Spec", link_fn) -
link_fn(deprecator.prefix, spec.prefix)
class OverwriteInstall:
def __init__(
self,
installer: PackageInstaller,
database: spack.database.Database,
task: Task,
install_status: InstallStatus,
):
self.installer = installer
self.database = database
self.task = task
self.install_status = install_status
def install(self):
"""
Try to complete the install task overwriting the package prefix.
If this fails, try to recover the original install prefix. If that fails
too, mark the spec as uninstalled. This function always the original
install error if installation fails.
"""
try:
with fs.replace_directory_transaction(self.task.pkg.prefix):
self.installer._complete_task(self.task, self.install_status)
self.installer.install()
except fs.CouldNotRestoreDirectoryBackup as e:
self.database.remove(self.task.pkg.spec)
tty.error(
f"Recovery of install dir of {self.task.pkg.name} failed due to "
f"{e.outer_exception.__class__.__name__}: {str(e.outer_exception)}. "
"The spec is now uninstalled."
)
# Unwrap the actual installation exception.
raise e.inner_exception
class BadInstallPhase(spack.error.InstallError):
"""Raised for an install phase option is not allowed for a package."""
def __init__(self, pkg_name, phase):
super().__init__(f"'{phase}' is not a valid phase for package {pkg_name}")

View File

@ -1198,26 +1198,32 @@ def test_install_implicit(install_mockery, mock_fetch):
assert not create_build_task(pkg).explicit
#### WIP #####
def test_overwrite_install_backup_success(temporary_store, config, mock_packages, tmpdir):
"""
When doing an overwrite install that fails, Spack should restore the backup
of the original prefix, and leave the original spec marked installed.
"""
# Note: this test relies on installing a package with no dependencies
# Get a build task. TODO: refactor this to avoid calling internal methods
installer = create_installer(["pkg-c"])
# call overwrite_install and have it fail
# active the error handling
# ensure that the backup is restored
# ensure that the original spec is still installed
# Get a build task. TODO: Refactor this to avoid calling internal methods.
installer = create_installer(["pkg-b"])
installer._init_queue()
task = installer._pop_ready_task()
task = installer._pop_task()
# Make sure the install prefix exists with some trivial file
installed_file = os.path.join(task.pkg.prefix, "some_file")
fs.touchp(installed_file)
class InstallerThatWipesThePrefixDir:
def install(self):
shutil.rmtree(task.pkg.prefix, ignore_errors=True)
fs.mkdirp(task.pkg.prefix)
raise Exception("Some fatal install error")
# Install that wipes the prefix directory
def wiped_installer():
shutil.rmtree(task.pkg.prefix)
class FakeDatabase:
called = False