added no_op state tracking

This commit is contained in:
kshea21 2025-01-28 14:50:00 -08:00 committed by Gregory Becker
parent 1a19c09c55
commit 974d10f32b
No known key found for this signature in database
GPG Key ID: 2362541F6D14ED84

View File

@ -118,7 +118,9 @@ class ExecuteResult(enum.Enum):
FAILED = enum.auto() FAILED = enum.auto()
# Task is missing build spec and will be requeued # Task is missing build spec and will be requeued
MISSING_BUILD_SPEC = enum.auto() MISSING_BUILD_SPEC = enum.auto()
# Task is installed upstream/external or
# task is not ready for installation (locked by another process)
NO_OP = enum.auto()
class InstallAction(enum.Enum): class InstallAction(enum.Enum):
#: Don't perform an install #: Don't perform an install
@ -1144,8 +1146,9 @@ def priority(self):
class BuildTask(Task): class BuildTask(Task):
"""Class for representing a build task for a package.""" """Class for representing a build task for a package."""
process_handle: spack.build_environment.ProcessHandle process_handle: spack.build_environment.ProcessHandle = None
started: bool = False started: bool = False
no_op: bool = False
def start(self): def start(self):
"""Attempt to use the binary cache to install """Attempt to use the binary cache to install
@ -1175,6 +1178,11 @@ def start(self):
else: else:
tty.msg(f"No binary for {pkg_id} found: installing from source") tty.msg(f"No binary for {pkg_id} found: installing from source")
# if there's an error result, don't start a new process, and leave
if self.error_result is not None:
print("got to the start error handling !!! !! !!!")
return
# Create stage object now and let it be serialized for the child process. That # Create stage object now and let it be serialized for the child process. That
# way monkeypatch in tests works correctly. # way monkeypatch in tests works correctly.
pkg.stage pkg.stage
@ -1191,16 +1199,18 @@ def start(self):
def poll(self): def poll(self):
"""Check if task has successfully executed, caused an InstallError, """Check if task has successfully executed, caused an InstallError,
or the child process has information ready to receive.""" or the child process has information ready to receive."""
assert self.started, "Can't call `poll()` before `start()`" assert self.started or self.no_op, "Can't call `poll()` before `start()` or identified no-operation task"
return self.success_result or self.error_result or self.process_handle.poll() return self.no_op or self.success_result or self.error_result or self.process_handle.poll()
def complete(self): def complete(self):
""" """
Complete the installation of the requested spec and/or dependency Complete the installation of the requested spec and/or dependency
represented by the build task. represented by the build task.
""" """
assert self.started, "Can't call `complete()` before `start()`" assert self.started or self.no_op, "Can't call `complete()` before `start()` or identified no-operation task"
# seeing if I can get rid of that assertion because in some cases we will need to
# complete the task so that we can raise the errors even though a process was never
# started because saving the error made it exit the function before a process could be started
install_args = self.request.install_args install_args = self.request.install_args
pkg = self.pkg pkg = self.pkg
tests = install_args.get("tests") tests = install_args.get("tests")
@ -1208,17 +1218,22 @@ def complete(self):
self.status = BuildStatus.INSTALLING self.status = BuildStatus.INSTALLING
pkg.run_tests = tests is True or tests and pkg.name in tests pkg.run_tests = tests is True or tests and pkg.name in tests
# If task has been identified as a no operation,
# return ExecuteResult.NOOP
if self.no_op:
return ExecuteResult.NO_OP
# If installing a package from binary cache is successful, # If installing a package from binary cache is successful,
# return ExecuteResult.SUCCESS # return ExecuteResult.SUCCESS
if self.success_result is not None: if self.success_result is not None:
return self.success_result return self.success_result
# If installing a package from binary cache raises an error, # If an error arises from installing a package,
# raise spack.error.InstallError # raise spack.error.InstallError
if self.error_result is not None: if self.error_result is not None:
raise self.error_result raise self.error_result
# hook that allows tests to inspect the Package before installation # hook that allows tests to inspect the Package before installation
# see unit_test_check() docs. # see unit_test_check() docs.
if not pkg.unit_test_check(): if not pkg.unit_test_check():
@ -1815,6 +1830,8 @@ def _complete_task(self, task: Task, install_status: InstallStatus) -> None:
rc = task.complete() rc = task.complete()
if rc == ExecuteResult.MISSING_BUILD_SPEC: if rc == ExecuteResult.MISSING_BUILD_SPEC:
self._requeue_with_build_spec_tasks(task) self._requeue_with_build_spec_tasks(task)
elif rc == ExecuteResult.NO_OP:
pass
else: # if rc == ExecuteResult.SUCCESS or rc == ExecuteResult.FAILED else: # if rc == ExecuteResult.SUCCESS or rc == ExecuteResult.FAILED
self._update_installed(task) self._update_installed(task)
@ -2122,6 +2139,7 @@ def start_task(task) -> None:
if _handle_external_and_upstream(pkg, task.explicit): if _handle_external_and_upstream(pkg, task.explicit):
term_status.clear() term_status.clear()
self._flag_installed(pkg, task.dependents) self._flag_installed(pkg, task.dependents)
task.no_op = True
return return
# Flag a failed spec. Do not need an (install) prefix lock since # Flag a failed spec. Do not need an (install) prefix lock since
@ -2132,9 +2150,7 @@ def start_task(task) -> None:
self._update_failed(task) self._update_failed(task)
if self.fail_fast: if self.fail_fast:
raise spack.error.InstallError(fail_fast_err, pkg=pkg) task.error_result = spack.error.InstallError(fail_fast_err, pkg=pkg)
return
# Attempt to get a write lock. If we can't get the lock then # Attempt to get a write lock. If we can't get the lock then
# another process is likely (un)installing the spec or has # another process is likely (un)installing the spec or has
@ -2153,6 +2169,7 @@ def start_task(task) -> None:
# -- failed, installed, or uninstalled -- on the next pass. # -- failed, installed, or uninstalled -- on the next pass.
if lock is None: if lock is None:
self._requeue_task(task, install_status) self._requeue_task(task, install_status)
task.no_op = True
return return
term_status.clear() term_status.clear()
@ -2187,6 +2204,7 @@ def start_task(task) -> None:
# or uninstalled -- on the next pass. # or uninstalled -- on the next pass.
self.installed.remove(pkg_id) self.installed.remove(pkg_id)
self._requeue_task(task, install_status) self._requeue_task(task, install_status)
task.no_op = True
return return
# Having a read lock on an uninstalled pkg may mean another # Having a read lock on an uninstalled pkg may mean another
@ -2200,6 +2218,7 @@ def start_task(task) -> None:
if ltype == "read": if ltype == "read":
lock.release_read() lock.release_read()
self._requeue_task(task, install_status) self._requeue_task(task, install_status)
task.no_op = True
return return
# Proceed with the installation since we have an exclusive write # Proceed with the installation since we have an exclusive write
@ -2211,7 +2230,6 @@ def start_task(task) -> None:
# Start a child process for a task that's ready to be installed. # Start a child process for a task that's ready to be installed.
task.start() task.start()
tty.msg(install_msg(pkg_id, self.pid, install_status)) tty.msg(install_msg(pkg_id, self.pid, install_status))
active_tasks.append(task)
elif action == InstallAction.OVERWRITE: elif action == InstallAction.OVERWRITE:
# spack.store.STORE.db is not really a Database object, but a small # spack.store.STORE.db is not really a Database object, but a small
# wrapper -- silence mypy # wrapper -- silence mypy
@ -2226,12 +2244,8 @@ def complete_task(task) -> None:
keep_prefix = install_args.get("keep_prefix") keep_prefix = install_args.get("keep_prefix")
action = self._install_action(task) action = self._install_action(task)
try: try:
try: self._complete_task(task, install_status)
self._complete_task(task, install_status)
finally:
# Remove task from active_tasks on error or success
active_tasks.remove(task)
# If we installed then we should keep the prefix # If we installed then we should keep the prefix
stop_before_phase = getattr(pkg, "stop_before_phase", None) stop_before_phase = getattr(pkg, "stop_before_phase", None)
last_phase = getattr(pkg, "last_phase", None) last_phase = getattr(pkg, "last_phase", None)
@ -2305,6 +2319,7 @@ def complete_task(task) -> None:
# no ready tasks # no ready tasks
break break
active_tasks.append(task)
try: try:
# Attempt to start the task's package installation # Attempt to start the task's package installation
start_task(task) start_task(task)
@ -2315,10 +2330,24 @@ def complete_task(task) -> None:
time.sleep(0.1) time.sleep(0.1)
# Check if any tasks have completed and add to list # Check if any tasks have completed and add to list
#for task in active_tasks:
# print("what are the tasks",task)
done = [task for task in active_tasks if task.poll()] done = [task for task in active_tasks if task.poll()]
# Iterate through the done tasks and complete them # Iterate through the done tasks and complete them
for task in done: for task in done:
complete_task(task) try:
complete_task(task)
except:
# Terminate any active child processes if there's an installation error
for task in active_tasks:
print("terminate active tasks for loop")
if task.process_handle is not None:
print("are we trying to shut down a tangential active process")
task.process_handle.terminate_processes()
raise
finally:
active_tasks.remove(task)
self._clear_removed_tasks() self._clear_removed_tasks()
if self.build_pq: if self.build_pq: