From 974d10f32b584aabc4b777607f41a149e6618ecf Mon Sep 17 00:00:00 2001 From: kshea21 Date: Tue, 28 Jan 2025 14:50:00 -0800 Subject: [PATCH] added no_op state tracking --- lib/spack/spack/installer.py | 67 ++++++++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 19 deletions(-) diff --git a/lib/spack/spack/installer.py b/lib/spack/spack/installer.py index 1823c34b8a6..2aae65506f1 100644 --- a/lib/spack/spack/installer.py +++ b/lib/spack/spack/installer.py @@ -118,7 +118,9 @@ class ExecuteResult(enum.Enum): FAILED = enum.auto() # Task is missing build spec and will be requeued MISSING_BUILD_SPEC = enum.auto() - + # Task is installed upstream/external or + # task is not ready for installation (locked by another process) + NO_OP = enum.auto() class InstallAction(enum.Enum): #: Don't perform an install @@ -1144,8 +1146,9 @@ def priority(self): class BuildTask(Task): """Class for representing a build task for a package.""" - process_handle: spack.build_environment.ProcessHandle + process_handle: spack.build_environment.ProcessHandle = None started: bool = False + no_op: bool = False def start(self): """Attempt to use the binary cache to install @@ -1175,6 +1178,11 @@ def start(self): else: tty.msg(f"No binary for {pkg_id} found: installing from source") + # if there's an error result, don't start a new process, and leave + if self.error_result is not None: + print("got to the start error handling !!! !! !!!") + return + # Create stage object now and let it be serialized for the child process. That # way monkeypatch in tests works correctly. pkg.stage @@ -1191,16 +1199,18 @@ def start(self): def poll(self): """Check if task has successfully executed, caused an InstallError, or the child process has information ready to receive.""" - assert self.started, "Can't call `poll()` before `start()`" - return self.success_result or self.error_result or self.process_handle.poll() + assert self.started or self.no_op, "Can't call `poll()` before `start()` or identified no-operation task" + return self.no_op or self.success_result or self.error_result or self.process_handle.poll() def complete(self): """ Complete the installation of the requested spec and/or dependency represented by the build task. """ - assert self.started, "Can't call `complete()` before `start()`" - + assert self.started or self.no_op, "Can't call `complete()` before `start()` or identified no-operation task" + # seeing if I can get rid of that assertion because in some cases we will need to + # complete the task so that we can raise the errors even though a process was never + # started because saving the error made it exit the function before a process could be started install_args = self.request.install_args pkg = self.pkg tests = install_args.get("tests") @@ -1208,17 +1218,22 @@ def complete(self): self.status = BuildStatus.INSTALLING pkg.run_tests = tests is True or tests and pkg.name in tests + + # If task has been identified as a no operation, + # return ExecuteResult.NOOP + if self.no_op: + return ExecuteResult.NO_OP # If installing a package from binary cache is successful, # return ExecuteResult.SUCCESS if self.success_result is not None: return self.success_result - # If installing a package from binary cache raises an error, + # If an error arises from installing a package, # raise spack.error.InstallError if self.error_result is not None: raise self.error_result - + # hook that allows tests to inspect the Package before installation # see unit_test_check() docs. if not pkg.unit_test_check(): @@ -1815,6 +1830,8 @@ def _complete_task(self, task: Task, install_status: InstallStatus) -> None: rc = task.complete() if rc == ExecuteResult.MISSING_BUILD_SPEC: self._requeue_with_build_spec_tasks(task) + elif rc == ExecuteResult.NO_OP: + pass else: # if rc == ExecuteResult.SUCCESS or rc == ExecuteResult.FAILED self._update_installed(task) @@ -2122,6 +2139,7 @@ def start_task(task) -> None: if _handle_external_and_upstream(pkg, task.explicit): term_status.clear() self._flag_installed(pkg, task.dependents) + task.no_op = True return # Flag a failed spec. Do not need an (install) prefix lock since @@ -2132,9 +2150,7 @@ def start_task(task) -> None: self._update_failed(task) if self.fail_fast: - raise spack.error.InstallError(fail_fast_err, pkg=pkg) - - return + task.error_result = spack.error.InstallError(fail_fast_err, pkg=pkg) # Attempt to get a write lock. If we can't get the lock then # another process is likely (un)installing the spec or has @@ -2153,6 +2169,7 @@ def start_task(task) -> None: # -- failed, installed, or uninstalled -- on the next pass. if lock is None: self._requeue_task(task, install_status) + task.no_op = True return term_status.clear() @@ -2187,6 +2204,7 @@ def start_task(task) -> None: # or uninstalled -- on the next pass. self.installed.remove(pkg_id) self._requeue_task(task, install_status) + task.no_op = True return # Having a read lock on an uninstalled pkg may mean another @@ -2200,6 +2218,7 @@ def start_task(task) -> None: if ltype == "read": lock.release_read() self._requeue_task(task, install_status) + task.no_op = True return # Proceed with the installation since we have an exclusive write @@ -2211,7 +2230,6 @@ def start_task(task) -> None: # Start a child process for a task that's ready to be installed. task.start() tty.msg(install_msg(pkg_id, self.pid, install_status)) - active_tasks.append(task) elif action == InstallAction.OVERWRITE: # spack.store.STORE.db is not really a Database object, but a small # wrapper -- silence mypy @@ -2226,12 +2244,8 @@ def complete_task(task) -> None: keep_prefix = install_args.get("keep_prefix") action = self._install_action(task) try: - try: - self._complete_task(task, install_status) - finally: - # Remove task from active_tasks on error or success - active_tasks.remove(task) - + self._complete_task(task, install_status) + # If we installed then we should keep the prefix stop_before_phase = getattr(pkg, "stop_before_phase", None) last_phase = getattr(pkg, "last_phase", None) @@ -2305,6 +2319,7 @@ def complete_task(task) -> None: # no ready tasks break + active_tasks.append(task) try: # Attempt to start the task's package installation start_task(task) @@ -2315,10 +2330,24 @@ def complete_task(task) -> None: time.sleep(0.1) # Check if any tasks have completed and add to list + #for task in active_tasks: + # print("what are the tasks",task) done = [task for task in active_tasks if task.poll()] # Iterate through the done tasks and complete them for task in done: - complete_task(task) + try: + complete_task(task) + except: + # Terminate any active child processes if there's an installation error + for task in active_tasks: + print("terminate active tasks for loop") + if task.process_handle is not None: + print("are we trying to shut down a tangential active process") + task.process_handle.terminate_processes() + raise + finally: + active_tasks.remove(task) + self._clear_removed_tasks() if self.build_pq: