refactor transaction across start/complete

This commit is contained in:
Gregory Becker 2025-02-19 16:37:41 -08:00
parent 4e9547703c
commit f1638365a9
No known key found for this signature in database
GPG Key ID: 2362541F6D14ED84
3 changed files with 96 additions and 59 deletions

View File

@ -1030,6 +1030,9 @@ def replace_directory_transaction(directory_name):
Returns: Returns:
temporary directory where ``directory_name`` has been moved temporary directory where ``directory_name`` has been moved
""" """
for a, b, c in os.walk(directory_name):
print("PRE", a, b, c)
# Check the input is indeed a directory with absolute path. # Check the input is indeed a directory with absolute path.
# Raise before anything is done to avoid moving the wrong directory # Raise before anything is done to avoid moving the wrong directory
directory_name = os.path.abspath(directory_name) directory_name = os.path.abspath(directory_name)
@ -1061,10 +1064,11 @@ def replace_directory_transaction(directory_name):
raise CouldNotRestoreDirectoryBackup(inner_exception, outer_exception) raise CouldNotRestoreDirectoryBackup(inner_exception, outer_exception)
for a, b, c in os.walk(directory_name): for a, b, c in os.walk(directory_name):
print(a,b,c) print("RESTORED", a, b, c)
tty.debug("Directory recovered [{0}]".format(directory_name)) tty.debug("Directory recovered [{0}]".format(directory_name))
raise raise
else: else:
print("NO FAILURE")
# Otherwise delete the temporary directory # Otherwise delete the temporary directory
shutil.rmtree(tmpdir, ignore_errors=True) shutil.rmtree(tmpdir, ignore_errors=True)
tty.debug("Temporary directory deleted [{0}]".format(tmpdir)) tty.debug("Temporary directory deleted [{0}]".format(tmpdir))

View File

@ -35,6 +35,7 @@
import shutil import shutil
import sys import sys
import time import time
import tempfile
from collections import defaultdict from collections import defaultdict
from gzip import GzipFile from gzip import GzipFile
from typing import Dict, Iterator, List, Optional, Set, Tuple, Union, Callable from typing import Dict, Iterator, List, Optional, Set, Tuple, Union, Callable
@ -43,7 +44,7 @@
import llnl.util.lock as lk import llnl.util.lock as lk
import llnl.util.tty as tty import llnl.util.tty as tty
from llnl.string import ordinal from llnl.string import ordinal
from llnl.util.lang import pretty_seconds from llnl.util.lang import nullcontext, pretty_seconds
from llnl.util.tty.color import colorize from llnl.util.tty.color import colorize
from llnl.util.tty.log import log_output from llnl.util.tty.log import log_output
@ -979,6 +980,9 @@ def __init__(
# initialize cache variables # initialize cache variables
self._install_action = None self._install_action = None
# context to allow overwriting
self.context = nullcontext()
def start(self): def start(self):
"""Start the work of this task.""" """Start the work of this task."""
raise NotImplementedError raise NotImplementedError
@ -1227,6 +1231,8 @@ class BuildTask(Task):
process_handle: Optional["spack.build_environment.ProcessHandle"] = None process_handle: Optional["spack.build_environment.ProcessHandle"] = None
started: bool = False started: bool = False
no_op: bool = False no_op: bool = False
tmpdir = None
backup_dir = None
def start(self): def start(self):
"""Attempt to use the binary cache to install """Attempt to use the binary cache to install
@ -1237,6 +1243,11 @@ def start(self):
for a, b, c in os.walk(self.pkg.prefix): for a, b, c in os.walk(self.pkg.prefix):
print("start", a, b, c) print("start", a, b, c)
if self.install_action == InstallAction.OVERWRITE:
self.tmpdir = tempfile.mkdtemp(dir=os.path.dirname(self.pkg.prefix), prefix=".backup")
self.backup_dir = os.path.join(self.tmpdir, "backup")
os.rename(self.pkg.prefix, self.backup_dir)
assert not self.started, "Cannot start a task that has already been started." assert not self.started, "Cannot start a task that has already been started."
self.started = True self.started = True
@ -1260,6 +1271,9 @@ def start(self):
else: else:
tty.msg(f"No binary for {pkg_id} found: installing from source") tty.msg(f"No binary for {pkg_id} found: installing from source")
for a, b, c in os.walk(self.pkg.prefix):
print("start2", a, b, c)
# if there's an error result, don't start a new process, and leave # if there's an error result, don't start a new process, and leave
if self.error_result is not None: if self.error_result is not None:
return return
@ -1269,13 +1283,11 @@ def start(self):
pkg.stage pkg.stage
self._setup_install_dir(pkg) self._setup_install_dir(pkg)
for a, b, c in os.walk(self.pkg.prefix):
print("start3", a, b, c)
# Create a child process to do the actual installation. # Create a child process to do the actual installation.
process_start_method = spack.build_environment.start_build_process self.process_handle = spack.build_environment.start_build_process(
if action == InstallAction.OVERWRITE:
process_start_method = overwrite_start_build_process
self.process_handle = process_start_method(
self.pkg, build_process, self.request.install_args self.pkg, build_process, self.request.install_args
) )
@ -1290,6 +1302,27 @@ def poll(self):
), "Can't call `poll()` before `start()` or identified no-operation task" ), "Can't call `poll()` before `start()` or identified no-operation task"
return self.no_op or self.success_result or self.error_result or self.process_handle.poll() return self.no_op or self.success_result or self.error_result or self.process_handle.poll()
def succeed(self):
# delete the temporary backup for an overwrite
# see llnl.util.filesystem.restore_directory_transaction
if self.install_action == InstallAction.OVERWRITE:
shutil.rmtree(self.tmpdir, ignore_errors=True)
def fail(self, inner_exception):
if self.install_action != InstallAction.OVERWRITE:
raise inner_exception
# restore the overwrite directory from backup
# see llnl.util.filesystem.restore_directory_transaction
try:
if os.path.exists(self.pkg.prefix):
shutil.rmtree(self.pkg.prefix)
os.rename(self.backup_dir, self.pkg.prefix)
except Exception as outer_exception:
raise fs.CouldNotRestoreDirectoryBackup(inner_exception, outer_exception)
raise inner_exception
def complete(self): def complete(self):
""" """
Complete the installation of the requested spec and/or dependency Complete the installation of the requested spec and/or dependency
@ -1310,23 +1343,24 @@ def complete(self):
# If task has been identified as a no operation, # If task has been identified as a no operation,
# return ExecuteResult.NOOP # return ExecuteResult.NOOP
if self.no_op: if self.no_op:
self.succeed()
return ExecuteResult.NO_OP return ExecuteResult.NO_OP
# If installing a package from binary cache is successful, # If installing a package from binary cache is successful,
# return ExecuteResult.SUCCESS # return ExecuteResult.SUCCESS
if self.success_result is not None: if self.success_result is not None:
self.succeed()
return self.success_result return self.success_result
# If an error arises from installing a package, # If an error arises from installing a package,
# raise spack.error.InstallError # raise spack.error.InstallError
if self.error_result is not None: if self.error_result is not None:
self.fail(self.error_result)
print("error result in completei 2",self.error_result)
raise self.error_result
# hook that allows tests to inspect the Package before installation # hook that allows tests to inspect the Package before installation
# see unit_test_check() docs. # see unit_test_check() docs.
if not pkg.unit_test_check(): if not pkg.unit_test_check():
self.succeed()
return ExecuteResult.FAILED return ExecuteResult.FAILED
try: try:
@ -1342,9 +1376,10 @@ def complete(self):
pid = f"{self.pid}: " if tty.show_pid() else "" pid = f"{self.pid}: " if tty.show_pid() else ""
tty.debug(f"{pid}{str(e)}") tty.debug(f"{pid}{str(e)}")
tty.debug(f"Package stage directory: {pkg.stage.source_path}") tty.debug(f"Package stage directory: {pkg.stage.source_path}")
finally: except (Exception, KeyboardInterrupt, SystemExit) as e:
print("error result in completei 4",self.error_result) self.fail(e)
self.succeed()
return ExecuteResult.SUCCESS return ExecuteResult.SUCCESS
def terminate(self) -> None: def terminate(self) -> None:
@ -1905,6 +1940,7 @@ def _complete_task(self, task: Task, install_status: InstallStatus) -> None:
install_status: the installation status for the package install_status: the installation status for the package
""" """
rc = task.complete() rc = task.complete()
task.context.__exit__()
if rc == ExecuteResult.MISSING_BUILD_SPEC: if rc == ExecuteResult.MISSING_BUILD_SPEC:
self._requeue_with_build_spec_tasks(task) self._requeue_with_build_spec_tasks(task)
elif rc == ExecuteResult.NO_OP: elif rc == ExecuteResult.NO_OP:
@ -2251,6 +2287,9 @@ def start_task(
task.no_op = True task.no_op = True
return return
for a, b, c in os.walk(task.pkg.prefix):
print("8", a, b, c)
# Having a read lock on an uninstalled pkg may mean another # Having a read lock on an uninstalled pkg may mean another
# process completed an uninstall of the software between the # process completed an uninstall of the software between the
# time we failed to acquire the write lock and the time we # time we failed to acquire the write lock and the time we
@ -2265,6 +2304,9 @@ def start_task(
task.no_op = True task.no_op = True
return return
for a, b, c in os.walk(task.pkg.prefix):
print("9", a, b, c)
# Proceed with the installation since we have an exclusive write # Proceed with the installation since we have an exclusive write
# lock on the package. # lock on the package.
install_status.set_term_title(f"Installing {task.pkg.name}") install_status.set_term_title(f"Installing {task.pkg.name}")
@ -2275,6 +2317,9 @@ def start_task(
task.start() task.start()
tty.msg(install_msg(pkg_id, self.pid, install_status)) tty.msg(install_msg(pkg_id, self.pid, install_status))
for a, b, c in os.walk(task.pkg.prefix):
print("10", a, b, c)
def complete_task(self, task: Task, install_status: InstallStatus) -> Optional[Tuple]: def complete_task(self, task: Task, install_status: InstallStatus) -> Optional[Tuple]:
"""Attempts to complete a package installation.""" """Attempts to complete a package installation."""
pkg, pkg_id = task.pkg, task.pkg_id pkg, pkg_id = task.pkg, task.pkg_id
@ -2325,20 +2370,8 @@ def complete_task(self, task: Task, install_status: InstallStatus) -> Optional[T
except (Exception, SystemExit) as exc: except (Exception, SystemExit) as exc:
# Overwrite process exception handling # Overwrite process exception handling
if isinstance(exc, spack.build_environment.ChildError) and exc.name == "CouldNotRestoreDirectoryBackup":
print("could not restore exception")
spack.store.STORE.db.remove(task.pkg.spec)
tty.error(
f"Recovery of install dir of {task.pkg.name} failed due to "
f"{exc.outer_exception.__class__.__name__}: {str(exc.outer_exception)}. "
"The spec is now uninstalled."
)
# Unwrap the actual installation exception.
raise exc.inner_exception
self._update_failed(task, True, exc) self._update_failed(task, True, exc)
print("HEHEREHERERES")
# Best effort installs suppress the exception and mark the # Best effort installs suppress the exception and mark the
# package as a failure. # package as a failure.
if not isinstance(exc, spack.error.SpackError) or not exc.printed: # type: ignore[union-attr] # noqa: E501 if not isinstance(exc, spack.error.SpackError) or not exc.printed: # type: ignore[union-attr] # noqa: E501
@ -2356,6 +2389,7 @@ def complete_task(self, task: Task, install_status: InstallStatus) -> Optional[T
# Terminate when a single build request has failed, or summarize errors later. # Terminate when a single build request has failed, or summarize errors later.
if task.is_build_request: if task.is_build_request:
if len(self.build_requests) == 1: if len(self.build_requests) == 1:
print("1")
raise raise
return (pkg, pkg_id, str(exc)) return (pkg, pkg_id, str(exc))
@ -2411,8 +2445,13 @@ def install(self) -> None:
for task in done: for task in done:
try: try:
# If complete_task does not return None, the build request failed # If complete_task does not return None, the build request failed
print("install_status when passed to complete_task", install_status, type(install_status)) print(
"install_status when passed to complete_task",
install_status,
type(install_status),
)
failure = self.complete_task(task, install_status) failure = self.complete_task(task, install_status)
print("COMPLETED WITH FAILURE", failure)
if failure: if failure:
failed_build_requests.append(failure) failed_build_requests.append(failure)
except Exception: except Exception:
@ -2460,6 +2499,7 @@ def install(self) -> None:
] ]
if failed_build_requests or missing: if failed_build_requests or missing:
print("BBBBBBB")
for _, pkg_id, err in failed_build_requests: for _, pkg_id, err in failed_build_requests:
tty.error(f"{pkg_id}: {err}") tty.error(f"{pkg_id}: {err}")
@ -2706,14 +2746,6 @@ def build_process(pkg: "spack.package_base.PackageBase", install_args: dict) ->
return installer.run() return installer.run()
def overwrite_start_build_process(pkg: "spack.package_base.PackageBase", method: Callable, install_args: dict) -> bool:
# Try to run the install task overwriting the package prefix.
# If this fails, try to recover the original install prefix. If that fails
# too, mark the spec as uninstalled. This function always the original
# install error if installation fails. See ''complete_task()''
with fs.replace_directory_transaction(pkg.prefix):
return spack.build_environment.start_build_process(pkg, method, install_args)
def deprecate(spec: "spack.spec.Spec", deprecator: "spack.spec.Spec", link_fn) -> None: def deprecate(spec: "spack.spec.Spec", deprecator: "spack.spec.Spec", link_fn) -> None:
"""Deprecate this package in favor of deprecator spec""" """Deprecate this package in favor of deprecator spec"""
# Here we assume we don't deprecate across different stores, and that same hash # Here we assume we don't deprecate across different stores, and that same hash

View File

@ -1222,6 +1222,7 @@ def test_install_implicit(install_mockery, mock_fetch):
# Install that wipes the prefix directory # Install that wipes the prefix directory
def wipe_prefix(pkg, install_args): def wipe_prefix(pkg, install_args):
print("AAAAAAAAA")
shutil.rmtree(pkg.prefix, ignore_errors=True) shutil.rmtree(pkg.prefix, ignore_errors=True)
fs.mkdirp(pkg.prefix) fs.mkdirp(pkg.prefix)
raise Exception("Some fatal install error") raise Exception("Some fatal install error")