Bugfix/Installer: properly track task queueing (#46293)

* Bugfix/Installer: properly track task queueing
* Move ordinal() to llnl.string; change time to attempt
* Convert BuildTask to use kwargs (after pkg); convert STATUS_ to BuildStats enum
* BuildTask: instantiate with keyword only args after the request
* Installer: build request is required for initializing task
* Installer: only the initial BuildTask cannnot have status REMOVED
* Change queueing check
* ordinal(): simplify suffix determination [tgamblin]
* BuildStatus: ADDED -> QUEUED [becker33]
* BuildTask: clarify TypeError for 'installed' argument
This commit is contained in:
Tamara Dahlgren 2024-10-07 10:42:09 -07:00 committed by GitHub
parent f5135018dd
commit c77916146c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 122 additions and 65 deletions

View File

@ -41,6 +41,20 @@ def comma_and(sequence: List[str]) -> str:
return comma_list(sequence, "and") return comma_list(sequence, "and")
def ordinal(number: int) -> str:
"""Return the ordinal representation (1st, 2nd, 3rd, etc.) for the provided number.
Args:
number: int to convert to ordinal number
Returns: number's corresponding ordinal
"""
idx = (number % 10) << 1
tens = number % 100 // 10
suffix = "th" if tens == 1 or idx > 6 else "thstndrd"[idx : idx + 2]
return f"{number}{suffix}"
def quote(sequence: List[str], q: str = "'") -> List[str]: def quote(sequence: List[str], q: str = "'") -> List[str]:
"""Quotes each item in the input list with the quote character passed as second argument.""" """Quotes each item in the input list with the quote character passed as second argument."""
return [f"{q}{e}{q}" for e in sequence] return [f"{q}{e}{q}" for e in sequence]

View File

@ -27,6 +27,7 @@
""" """
import copy import copy
import enum
import glob import glob
import heapq import heapq
import io import io
@ -42,6 +43,7 @@
import llnl.util.filesystem as fs import llnl.util.filesystem as fs
import llnl.util.lock as lk import llnl.util.lock as lk
import llnl.util.tty as tty import llnl.util.tty as tty
from llnl.string import ordinal
from llnl.util.lang import pretty_seconds from llnl.util.lang import pretty_seconds
from llnl.util.tty.color import colorize from llnl.util.tty.color import colorize
from llnl.util.tty.log import log_output from llnl.util.tty.log import log_output
@ -70,25 +72,32 @@
#: were added (see https://docs.python.org/2/library/heapq.html). #: were added (see https://docs.python.org/2/library/heapq.html).
_counter = itertools.count(0) _counter = itertools.count(0)
#: Build status indicating task has been added.
STATUS_ADDED = "queued"
#: Build status indicating the spec failed to install class BuildStatus(enum.Enum):
STATUS_FAILED = "failed" """Different build (task) states."""
#: Build status indicating the spec is being installed (possibly by another #: Build status indicating task has been added/queued.
#: process) QUEUED = enum.auto()
STATUS_INSTALLING = "installing"
#: Build status indicating the spec was sucessfully installed #: Build status indicating the spec failed to install
STATUS_INSTALLED = "installed" FAILED = enum.auto()
#: Build status indicating the task has been popped from the queue #: Build status indicating the spec is being installed (possibly by another
STATUS_DEQUEUED = "dequeued" #: process)
INSTALLING = enum.auto()
#: Build status indicating task has been removed (to maintain priority #: Build status indicating the spec was sucessfully installed
#: queue invariants). INSTALLED = enum.auto()
STATUS_REMOVED = "removed"
#: Build status indicating the task has been popped from the queue
DEQUEUED = enum.auto()
#: Build status indicating task has been removed (to maintain priority
#: queue invariants).
REMOVED = enum.auto()
def __str__(self):
return f"{self.name.lower()}"
def _write_timer_json(pkg, timer, cache): def _write_timer_json(pkg, timer, cache):
@ -846,31 +855,38 @@ class BuildTask:
def __init__( def __init__(
self, self,
pkg: "spack.package_base.PackageBase", pkg: "spack.package_base.PackageBase",
request: Optional[BuildRequest], request: BuildRequest,
compiler: bool, *,
start: float, compiler: bool = False,
attempts: int, start: float = 0.0,
status: str, attempts: int = 0,
installed: Set[str], status: BuildStatus = BuildStatus.QUEUED,
installed: Set[str] = set(),
): ):
""" """
Instantiate a build task for a package. Instantiate a build task for a package.
Args: Args:
pkg: the package to be built and installed pkg: the package to be built and installed and whose spec is
request: the associated install request where ``None`` can be concrete
used to indicate the package was explicitly requested by the user request: the associated install request
compiler: whether task is for a bootstrap compiler compiler: whether task is for a bootstrap compiler
start: the initial start time for the package, in seconds start: the initial start time for the package, in seconds
attempts: the number of attempts to install the package attempts: the number of attempts to install the package, which
should be 0 when the task is initially instantiated
status: the installation status status: the installation status
installed: the identifiers of packages that have installed: the (string) identifiers of packages that have
been installed so far been installed so far
Raises:
``InstallError`` if the build status is incompatible with the task
``TypeError`` if provided an argument of the wrong type
``ValueError`` if provided an argument with the wrong value or state
""" """
# Ensure dealing with a package that has a concrete spec # Ensure dealing with a package that has a concrete spec
if not isinstance(pkg, spack.package_base.PackageBase): if not isinstance(pkg, spack.package_base.PackageBase):
raise ValueError(f"{str(pkg)} must be a package") raise TypeError(f"{str(pkg)} must be a package")
self.pkg = pkg self.pkg = pkg
if not self.pkg.spec.concrete: if not self.pkg.spec.concrete:
@ -881,18 +897,20 @@ def __init__(
# The explicit build request associated with the package # The explicit build request associated with the package
if not isinstance(request, BuildRequest): if not isinstance(request, BuildRequest):
raise ValueError(f"{str(pkg)} must have a build request") raise TypeError(f"{request} is not a valid build request")
self.request = request self.request = request
# Initialize the status to an active state. The status is used to # Initialize the status to an active state. The status is used to
# ensure priority queue invariants when tasks are "removed" from the # ensure priority queue invariants when tasks are "removed" from the
# queue. # queue.
if status == STATUS_REMOVED: if not isinstance(status, BuildStatus):
raise TypeError(f"{status} is not a valid build status")
# The initial build task cannot have status "removed".
if attempts == 0 and status == BuildStatus.REMOVED:
raise spack.error.InstallError( raise spack.error.InstallError(
f"Cannot create a build task for {self.pkg_id} with status '{status}'", pkg=pkg f"Cannot create a build task for {self.pkg_id} with status '{status}'", pkg=pkg
) )
self.status = status self.status = status
# Package is associated with a bootstrap compiler # Package is associated with a bootstrap compiler
@ -901,6 +919,12 @@ def __init__(
# The initial start time for processing the spec # The initial start time for processing the spec
self.start = start self.start = start
if not isinstance(installed, set):
raise TypeError(
f"BuildTask constructor requires 'installed' be a 'set', "
f"not '{installed.__class__.__name__}'."
)
# Set of dependents, which needs to include the requesting package # Set of dependents, which needs to include the requesting package
# to support tracking of parallel, multi-spec, environment installs. # to support tracking of parallel, multi-spec, environment installs.
self.dependents = set(get_dependent_ids(self.pkg.spec)) self.dependents = set(get_dependent_ids(self.pkg.spec))
@ -922,13 +946,12 @@ def __init__(
# List of uninstalled dependencies, which is used to establish # List of uninstalled dependencies, which is used to establish
# the priority of the build task. # the priority of the build task.
#
self.uninstalled_deps = set( self.uninstalled_deps = set(
pkg_id for pkg_id in self.dependencies if pkg_id not in installed pkg_id for pkg_id in self.dependencies if pkg_id not in installed
) )
# Ensure key sequence-related properties are updated accordingly. # Ensure key sequence-related properties are updated accordingly.
self.attempts = 0 self.attempts = attempts
self._update() self._update()
def __eq__(self, other): def __eq__(self, other):
@ -1180,7 +1203,7 @@ def __str__(self) -> str:
def _add_init_task( def _add_init_task(
self, self,
pkg: "spack.package_base.PackageBase", pkg: "spack.package_base.PackageBase",
request: Optional[BuildRequest], request: BuildRequest,
is_compiler: bool, is_compiler: bool,
all_deps: Dict[str, Set[str]], all_deps: Dict[str, Set[str]],
) -> None: ) -> None:
@ -1189,14 +1212,17 @@ def _add_init_task(
Args: Args:
pkg: the package to be built and installed pkg: the package to be built and installed
request (BuildRequest or None): the associated install request request: the associated install request
where ``None`` can be used to indicate the package was is_compiler: whether task is for a bootstrap compiler
explicitly requested by the user all_deps: dictionary of all dependencies and associated dependents
is_compiler (bool): whether task is for a bootstrap compiler
all_deps (defaultdict(set)): dictionary of all dependencies and
associated dependents
""" """
task = BuildTask(pkg, request, is_compiler, 0, 0, STATUS_ADDED, self.installed) task = BuildTask(
pkg,
request=request,
compiler=is_compiler,
status=BuildStatus.QUEUED,
installed=self.installed,
)
for dep_id in task.dependencies: for dep_id in task.dependencies:
all_deps[dep_id].add(package_id(pkg.spec)) all_deps[dep_id].add(package_id(pkg.spec))
@ -1514,7 +1540,7 @@ def _add_tasks(self, request: BuildRequest, all_deps):
dep_id = package_id(dep) dep_id = package_id(dep)
if dep_id not in self.build_tasks: if dep_id not in self.build_tasks:
self._add_init_task(dep_pkg, request, False, all_deps) self._add_init_task(dep_pkg, request, is_compiler=False, all_deps=all_deps)
# Clear any persistent failure markings _unless_ they are # Clear any persistent failure markings _unless_ they are
# associated with another process in this parallel build # associated with another process in this parallel build
@ -1532,7 +1558,7 @@ def _add_tasks(self, request: BuildRequest, all_deps):
self._check_deps_status(request) self._check_deps_status(request)
# Now add the package itself, if appropriate # Now add the package itself, if appropriate
self._add_init_task(request.pkg, request, False, all_deps) self._add_init_task(request.pkg, request, is_compiler=False, all_deps=all_deps)
# Ensure if one request is to fail fast then all requests will. # Ensure if one request is to fail fast then all requests will.
fail_fast = bool(request.install_args.get("fail_fast")) fail_fast = bool(request.install_args.get("fail_fast"))
@ -1559,7 +1585,7 @@ def _install_task(self, task: BuildTask, install_status: InstallStatus) -> None:
tty.msg(install_msg(pkg_id, self.pid, install_status)) tty.msg(install_msg(pkg_id, self.pid, install_status))
task.start = task.start or time.time() task.start = task.start or time.time()
task.status = STATUS_INSTALLING task.status = BuildStatus.INSTALLING
# Use the binary cache if requested # Use the binary cache if requested
if use_cache: if use_cache:
@ -1623,9 +1649,9 @@ def _pop_task(self) -> Optional[BuildTask]:
""" """
while self.build_pq: while self.build_pq:
task = heapq.heappop(self.build_pq)[1] task = heapq.heappop(self.build_pq)[1]
if task.status != STATUS_REMOVED: if task.status != BuildStatus.REMOVED:
del self.build_tasks[task.pkg_id] del self.build_tasks[task.pkg_id]
task.status = STATUS_DEQUEUED task.status = BuildStatus.DEQUEUED
return task return task
return None return None
@ -1654,7 +1680,9 @@ def _push_task(self, task: BuildTask) -> None:
# Remove any associated build task since its sequence will change # Remove any associated build task since its sequence will change
self._remove_task(task.pkg_id) self._remove_task(task.pkg_id)
desc = "Queueing" if task.attempts == 0 else "Requeueing" desc = (
"Queueing" if task.attempts == 1 else f"Requeueing ({ordinal(task.attempts)} attempt)"
)
tty.debug(msg.format(desc, task.pkg_id, task.status)) tty.debug(msg.format(desc, task.pkg_id, task.status))
# Now add the new task to the queue with a new sequence number to # Now add the new task to the queue with a new sequence number to
@ -1698,7 +1726,7 @@ def _remove_task(self, pkg_id: str) -> Optional[BuildTask]:
if pkg_id in self.build_tasks: if pkg_id in self.build_tasks:
tty.debug(f"Removing build task for {pkg_id} from list") tty.debug(f"Removing build task for {pkg_id} from list")
task = self.build_tasks.pop(pkg_id) task = self.build_tasks.pop(pkg_id)
task.status = STATUS_REMOVED task.status = BuildStatus.REMOVED
return task return task
else: else:
return None return None
@ -1710,14 +1738,14 @@ def _requeue_task(self, task: BuildTask, install_status: InstallStatus) -> None:
Args: Args:
task (BuildTask): the installation build task for a package task (BuildTask): the installation build task for a package
""" """
if task.status not in [STATUS_INSTALLED, STATUS_INSTALLING]: if task.status not in [BuildStatus.INSTALLED, BuildStatus.INSTALLING]:
tty.debug( tty.debug(
f"{install_msg(task.pkg_id, self.pid, install_status)} " f"{install_msg(task.pkg_id, self.pid, install_status)} "
"in progress by another process" "in progress by another process"
) )
new_task = task.next_attempt(self.installed) new_task = task.next_attempt(self.installed)
new_task.status = STATUS_INSTALLING new_task.status = BuildStatus.INSTALLING
self._push_task(new_task) self._push_task(new_task)
def _setup_install_dir(self, pkg: "spack.package_base.PackageBase") -> None: def _setup_install_dir(self, pkg: "spack.package_base.PackageBase") -> None:
@ -1772,7 +1800,7 @@ def _update_failed(
self.failed[pkg_id] = spack.store.STORE.failure_tracker.mark(task.pkg.spec) self.failed[pkg_id] = spack.store.STORE.failure_tracker.mark(task.pkg.spec)
else: else:
self.failed[pkg_id] = None self.failed[pkg_id] = None
task.status = STATUS_FAILED task.status = BuildStatus.FAILED
for dep_id in task.dependents: for dep_id in task.dependents:
if dep_id in self.build_tasks: if dep_id in self.build_tasks:
@ -1792,7 +1820,7 @@ def _update_installed(self, task: BuildTask) -> None:
Args: Args:
task (BuildTask): the build task for the installed package task (BuildTask): the build task for the installed package
""" """
task.status = STATUS_INSTALLED task.status = BuildStatus.INSTALLED
self._flag_installed(task.pkg, task.dependents) self._flag_installed(task.pkg, task.dependents)
def _flag_installed( def _flag_installed(

View File

@ -12,22 +12,37 @@
def test_build_task_errors(install_mockery): def test_build_task_errors(install_mockery):
with pytest.raises(ValueError, match="must be a package"): """Check expected errors when instantiating a BuildTask."""
inst.BuildTask("abc", None, False, 0, 0, 0, set())
spec = spack.spec.Spec("trivial-install-test-package") spec = spack.spec.Spec("trivial-install-test-package")
pkg_cls = spack.repo.PATH.get_pkg_class(spec.name) pkg_cls = spack.repo.PATH.get_pkg_class(spec.name)
with pytest.raises(ValueError, match="must have a concrete spec"):
inst.BuildTask(pkg_cls(spec), None, False, 0, 0, 0, set())
# The value of the request argument is expected to not be checked.
for pkg in [None, "abc"]:
with pytest.raises(TypeError, match="must be a package"):
inst.BuildTask(pkg, None)
with pytest.raises(ValueError, match="must have a concrete spec"):
inst.BuildTask(pkg_cls(spec), None)
# Using a concretized package now means the request argument is checked.
spec.concretize() spec.concretize()
assert spec.concrete assert spec.concrete
with pytest.raises(ValueError, match="must have a build request"): with pytest.raises(TypeError, match="is not a valid build request"):
inst.BuildTask(spec.package, None, False, 0, 0, 0, set()) inst.BuildTask(spec.package, None)
# Using a valid package and spec, the next check is the status argument.
request = inst.BuildRequest(spec.package, {}) request = inst.BuildRequest(spec.package, {})
with pytest.raises(TypeError, match="is not a valid build status"):
inst.BuildTask(spec.package, request, status="queued")
# Now we can check that build tasks cannot be create when the status
# indicates the task is/should've been removed.
with pytest.raises(spack.error.InstallError, match="Cannot create a build task"): with pytest.raises(spack.error.InstallError, match="Cannot create a build task"):
inst.BuildTask(spec.package, request, False, 0, 0, inst.STATUS_REMOVED, set()) inst.BuildTask(spec.package, request, status=inst.BuildStatus.REMOVED)
# Also make sure to not accept an incompatible installed argument value.
with pytest.raises(TypeError, match="'installed' be a 'set', not 'str'"):
inst.BuildTask(spec.package, request, installed="mpileaks")
def test_build_task_basics(install_mockery): def test_build_task_basics(install_mockery):
@ -37,7 +52,7 @@ def test_build_task_basics(install_mockery):
# Ensure key properties match expectations # Ensure key properties match expectations
request = inst.BuildRequest(spec.package, {}) request = inst.BuildRequest(spec.package, {})
task = inst.BuildTask(spec.package, request, False, 0, 0, inst.STATUS_ADDED, set()) task = inst.BuildTask(spec.package, request=request, status=inst.BuildStatus.QUEUED)
assert not task.explicit assert not task.explicit
assert task.priority == len(task.uninstalled_deps) assert task.priority == len(task.uninstalled_deps)
assert task.key == (task.priority, task.sequence) assert task.key == (task.priority, task.sequence)
@ -59,16 +74,16 @@ def test_build_task_strings(install_mockery):
# Ensure key properties match expectations # Ensure key properties match expectations
request = inst.BuildRequest(spec.package, {}) request = inst.BuildRequest(spec.package, {})
task = inst.BuildTask(spec.package, request, False, 0, 0, inst.STATUS_ADDED, set()) task = inst.BuildTask(spec.package, request=request, status=inst.BuildStatus.QUEUED)
# Cover __repr__ # Cover __repr__
irep = task.__repr__() irep = task.__repr__()
assert irep.startswith(task.__class__.__name__) assert irep.startswith(task.__class__.__name__)
assert "status='queued'" in irep # == STATUS_ADDED assert "BuildStatus.QUEUED" in irep
assert "sequence=" in irep assert "sequence=" in irep
# Cover __str__ # Cover __str__
istr = str(task) istr = str(task)
assert "status=queued" in istr # == STATUS_ADDED assert "status=queued" in istr # == BuildStatus.QUEUED
assert "#dependencies=1" in istr assert "#dependencies=1" in istr
assert "priority=" in istr assert "priority=" in istr

View File

@ -73,7 +73,7 @@ def create_build_task(
pkg: spack.package_base.PackageBase, install_args: Optional[dict] = None pkg: spack.package_base.PackageBase, install_args: Optional[dict] = None
) -> inst.BuildTask: ) -> inst.BuildTask:
request = inst.BuildRequest(pkg, {} if install_args is None else install_args) request = inst.BuildRequest(pkg, {} if install_args is None else install_args)
return inst.BuildTask(pkg, request, False, 0, 0, inst.STATUS_ADDED, set()) return inst.BuildTask(pkg, request=request, status=inst.BuildStatus.QUEUED)
def create_installer( def create_installer(
@ -698,7 +698,7 @@ def test_requeue_task(install_mockery, capfd):
ids = list(installer.build_tasks) ids = list(installer.build_tasks)
assert len(ids) == 1 assert len(ids) == 1
qtask = installer.build_tasks[ids[0]] qtask = installer.build_tasks[ids[0]]
assert qtask.status == inst.STATUS_INSTALLING assert qtask.status == inst.BuildStatus.INSTALLING
assert qtask.sequence > task.sequence assert qtask.sequence > task.sequence
assert qtask.attempts == task.attempts + 1 assert qtask.attempts == task.attempts + 1