Compare commits

..

4 Commits

Author SHA1 Message Date
Todd Gamblin
3ad328e877 include grep executable in prefix length
Signed-off-by: Todd Gamblin <tgamblin@llnl.gov>
2025-05-19 23:30:45 -07:00
Todd Gamblin
0b0cf998b4 allow group_arguments to take a prefix length
Signed-off-by: Todd Gamblin <tgamblin@llnl.gov>
2025-05-19 17:05:46 -07:00
Todd Gamblin
52ce977b93 bugfix: Executable should record return code on CalledProcessError
Spack's `Executable` class isn't properly returning a called process's
return code when it fails with a `CalledProcessError`.  Record it before
raising a `ProcessError` so that client code can query it later.

Signed-off-by: Todd Gamblin <tgamblin@llnl.gov>
2025-05-19 16:22:56 -07:00
Todd Gamblin
be57175413 bugfix: make spack pkg grep respect windows CLI limits
`spack pkg grep` can construct command lines that are too long for Windows,
i.e. command lines that are longer than 32768 characters.

This makes `spack pkg grep` respect the Windows limit by default, and
gets the unix limit from `sysconfig`.

- [x] Add a new `spack.cmd.group_arguments` function to create CLI-safe arg groups
- [x] Default to max 500 elements or 32768 chars, whichever comes first
- [x] If sysconfig is available, get `SC_ARG_MAX` and use that for max chars
- [x] Clean up output handling in `test_pkg_grep` test
- [x] Add test for `group_arguments`

Signed-off-by: Todd Gamblin <tgamblin@llnl.gov>
2025-05-19 16:17:56 -07:00
7 changed files with 167 additions and 345 deletions

View File

@@ -67,7 +67,7 @@ def index_by(objects, *funcs):
}
If any elements in funcs is a string, it is treated as the name
of an attribute, and acts like ``getattr(object, name)``. So
of an attribute, and acts like getattr(object, name). So
shorthand for the above two indexes would be::
index1 = index_by(list_of_specs, 'arch', 'compiler')
@@ -77,8 +77,7 @@ def index_by(objects, *funcs):
index1 = index_by(list_of_specs, ('target', 'compiler'))
Keys in the resulting dict will look like ``('gcc', 'skylake')``.
Keys in the resulting dict will look like ('gcc', 'skylake').
"""
if not funcs:
return objects
@@ -316,9 +315,7 @@ def lazy_lexicographic_ordering(cls, set_hash=True):
This is a lazy version of the tuple comparison used frequently to
implement comparison in Python. Given some objects with fields, you
might use tuple keys to implement comparison, e.g.:
.. code-block:: python
might use tuple keys to implement comparison, e.g.::
class Widget:
def _cmp_key(self):
@@ -346,9 +343,7 @@ def __lt__(self):
Lazy lexicographic comparison maps the tuple comparison shown above
to generator functions. Instead of comparing based on pre-constructed
tuple keys, users of this decorator can compare using elements from a
generator. So, you'd write:
.. code-block:: python
generator. So, you'd write::
@lazy_lexicographic_ordering
class Widget:
@@ -371,38 +366,6 @@ def cd_fun():
only has to worry about writing ``_cmp_iter``, and making sure the
elements in it are also comparable.
In some cases, you may have a fast way to determine whether two
objects are equal, e.g. the ``is`` function or an already-computed
cryptographic hash. For this, you can implement your own
``_cmp_fast_eq`` function:
.. code-block:: python
@lazy_lexicographic_ordering
class Widget:
def _cmp_iter(self):
yield a
yield b
def cd_fun():
yield c
yield d
yield cd_fun
yield e
def _cmp_fast_eq(self, other):
return self is other or None
``_cmp_fast_eq`` should return:
* ``True`` if ``self`` is equal to ``other``,
* ``False`` if ``self`` is not equal to ``other``, and
* ``None`` if it's not known whether they are equal, and the full
comparison should be done.
``lazy_lexicographic_ordering`` uses ``_cmp_fast_eq`` to short-circuit
the comparison if the answer can be determined quickly. If you do not
implement it, it defaults to ``self is other or None``.
Some things to note:
* If a class already has ``__eq__``, ``__ne__``, ``__lt__``,
@@ -423,40 +386,34 @@ def _cmp_fast_eq(self, other):
if not hasattr(cls, "_cmp_iter"):
raise TypeError(f"'{cls.__name__}' doesn't define _cmp_iter().")
# get an equal operation that allows us to short-circuit comparison
# if it's not provided, default to `is`
_cmp_fast_eq = getattr(cls, "_cmp_fast_eq", lambda x, y: x is y or None)
# comparison operators are implemented in terms of lazy_eq and lazy_lt
def eq(self, other):
fast_eq = _cmp_fast_eq(self, other)
if fast_eq is not None:
return fast_eq
if self is other:
return True
return (other is not None) and lazy_eq(self._cmp_iter, other._cmp_iter)
def lt(self, other):
if _cmp_fast_eq(self, other) is True:
if self is other:
return False
return (other is not None) and lazy_lt(self._cmp_iter, other._cmp_iter)
def ne(self, other):
fast_eq = _cmp_fast_eq(self, other)
if fast_eq is not None:
return not fast_eq
if self is other:
return False
return (other is None) or not lazy_eq(self._cmp_iter, other._cmp_iter)
def gt(self, other):
if _cmp_fast_eq(self, other) is True:
if self is other:
return False
return (other is None) or lazy_lt(other._cmp_iter, self._cmp_iter)
def le(self, other):
if _cmp_fast_eq(self, other) is True:
if self is other:
return True
return (other is not None) and not lazy_lt(other._cmp_iter, self._cmp_iter)
def ge(self, other):
if _cmp_fast_eq(self, other) is True:
if self is other:
return True
return (other is None) or not lazy_lt(self._cmp_iter, other._cmp_iter)

View File

@@ -9,7 +9,7 @@
import re
import sys
from collections import Counter
from typing import List, Optional, Union
from typing import Generator, List, Optional, Sequence, Union
import llnl.string
import llnl.util.tty as tty
@@ -704,6 +704,67 @@ def first_line(docstring):
return docstring.split("\n")[0]
def group_arguments(
args: Sequence[str],
*,
max_group_size: int = 500,
prefix_length: int = 0,
max_group_length: Optional[int] = None,
) -> Generator[List[str], None, None]:
"""Splits the supplied list of arguments into groups for passing to CLI tools.
When passing CLI arguments, we need to ensure that argument lists are no longer than
the system command line size limit, and we may also need to ensure that groups are
no more than some number of arguments long.
This returns an iterator over lists of arguments that meet these constraints.
Arguments are in the same order they appeared in the original argument list.
If any argument's length is greater than the max_group_length, this will raise a
``ValueError``.
Arguments:
args: list of arguments to split into groups
max_group_size: max number of elements in any group (default 500)
prefix_length: length of any additional arguments (including spaces) to be passed before
the groups from args; default is 0 characters
max_group_length: max length of characters that if a group of args is joined by " "
On unix, ths defaults to SC_ARG_MAX from sysconf. On Windows the default is
the max usable for CreateProcess (32,768 chars)
"""
if max_group_length is None:
max_group_length = 32768 # default to the Windows limit
if hasattr(os, "sysconf"): # sysconf is only on unix
try:
sysconf_max = os.sysconf("SC_ARG_MAX")
if sysconf_max != -1: # returns -1 if an option isn't present
max_group_length = sysconf_max
except (ValueError, OSError):
pass # keep windows default if SC_ARG_MAX isn't in sysconf_names
group: List[str] = []
grouplen, space = prefix_length, 0
for arg in args:
arglen = len(arg)
if arglen > max_group_length:
raise ValueError(f"Argument is longer than max command line size: '{arg}'")
if arglen + prefix_length > max_group_length:
raise ValueError(f"Argument with prefix is longer than max command line size: '{arg}'")
next_grouplen = grouplen + arglen + space
if len(group) == max_group_size or next_grouplen > max_group_length:
yield group
group, grouplen, space = [], prefix_length, 0
group.append(arg)
grouplen += arglen + space
space = 1 # add a space for elements 1, 2, etc. but not 0
if group:
yield group
class CommandNotFoundError(spack.error.SpackError):
"""Exception class thrown when a requested command is not recognized as
such.

View File

@@ -3,7 +3,6 @@
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import argparse
import itertools
import os
import sys
@@ -182,21 +181,23 @@ def pkg_grep(args, unknown_args):
if "GNU" in grep("--version", output=str):
grep.add_default_arg("--color=auto")
# determines number of files to grep at a time
grouper = lambda e: e[0] // 100
all_paths = spack.repo.PATH.all_package_paths()
if not all_paths:
return 0 # no packages to search
# these args start every command invocation (grep arg1 arg2 ...)
all_prefix_args = grep.exe + args.grep_args + unknown_args
prefix_length = sum(len(arg) for arg in all_prefix_args) + len(all_prefix_args)
# set up iterator and save the first group to ensure we don't end up with a group of size 1
groups = itertools.groupby(enumerate(spack.repo.PATH.all_package_paths()), grouper)
if not groups:
return 0 # no packages to search
groups = spack.cmd.group_arguments(all_paths, prefix_length=prefix_length)
# You can force GNU grep to show filenames on every line with -H, but not POSIX grep.
# POSIX grep only shows filenames when you're grepping 2 or more files. Since we
# don't know which one we're running, we ensure there are always >= 2 files by
# saving the prior group of paths and adding it to a straggling group of 1 if needed.
# This works unless somehow there is only one package in all of Spack.
_, first_group = next(groups)
prior_paths = [path for _, path in first_group]
prior_paths = next(groups)
# grep returns 1 for nothing found, 0 for something found, and > 1 for error
return_code = 1
@@ -207,9 +208,7 @@ def grep_group(paths):
grep(*all_args, fail_on_error=False)
return grep.returncode
for _, group in groups:
paths = [path for _, path in group] # extract current path group
for paths in groups:
if len(paths) == 1:
# Only the very last group can have length 1. If it does, combine
# it with the prior group to ensure more than one path is grepped.

View File

@@ -961,6 +961,7 @@ def _sort_by_dep_types(dspec: DependencySpec):
return dspec.depflag
@lang.lazy_lexicographic_ordering
class _EdgeMap(collections.abc.Mapping):
"""Represent a collection of edges (DependencySpec objects) in the DAG.
@@ -998,6 +999,21 @@ def add(self, edge: DependencySpec) -> None:
def __str__(self) -> str:
return f"{{deps: {', '.join(str(d) for d in sorted(self.values()))}}}"
def _cmp_iter(self):
for item in sorted(itertools.chain.from_iterable(self.edges.values())):
yield item
def copy(self):
"""Copies this object and returns a clone"""
clone = type(self)()
clone.store_by_child = self.store_by_child
# Copy everything from this dict into it.
for dspec in itertools.chain.from_iterable(self.values()):
clone.add(dspec.copy())
return clone
def select(
self,
*,
@@ -3769,152 +3785,26 @@ def eq_node(self, other):
"""Equality with another spec, not including dependencies."""
return (other is not None) and lang.lazy_eq(self._cmp_node, other._cmp_node)
def _cmp_fast_eq(self, other) -> Optional[bool]:
"""Short-circuit compare with other for equality, for lazy_lexicographic_ordering."""
def _cmp_iter(self):
"""Lazily yield components of self for comparison."""
for item in self._cmp_node():
yield item
# If there is ever a breaking change to hash computation, whether accidental or purposeful,
# two specs can be identical modulo DAG hash, depending on what time they were concretized
# From the perspective of many operation in Spack (database, build cache, etc) a different
# DAG hash means a different spec. Here we ensure that two otherwise identical specs, one
# serialized before the hash change and one after, are considered different.
if self is other:
return True
yield self.dag_hash() if self.concrete else None
if self.concrete and other and other.concrete:
return self.dag_hash() == other.dag_hash()
def deps():
for dep in sorted(itertools.chain.from_iterable(self._dependencies.values())):
yield dep.spec.name
yield dep.depflag
yield hash(dep.spec)
return None
def _cmp_iter(self):
"""Lazily yield components of self for comparison."""
# Spec comparison in Spack needs to be fast, so there are several cases here for
# performance. The main places we care about this are:
#
# * Abstract specs: there are lots of abstract specs in package.py files,
# which are put into metadata dictionaries and sorted during concretization
# setup. We want comparing abstract specs to be fast.
#
# * Concrete specs: concrete specs are bigger and have lots of nodes and
# edges. Because of the graph complexity, we need a full, linear time
# traversal to compare them -- that's pretty much is unavoidable. But they
# also have precoputed cryptographic hashes (dag_hash()), which we can use
# to do fast equality comparison. See _cmp_fast_eq() above for the
# short-circuit logic for hashes.
#
# A full traversal involves constructing data structurs, visitor objects, etc.,
# and it can be expensive if we have to do it to compare a bunch of tiny
# abstract specs. Therefore, there are 3 cases below, which avoid calling
# `spack.traverse.traverse_edges()` unless necessary.
#
# WARNING: the cases below need to be consistent, so don't mess with this code
# unless you really know what you're doing. Be sure to keep all three consistent.
#
# All cases lazily yield:
#
# 1. A generator over nodes
# 2. A generator over canonical edges
#
# Canonical edges have consistent ids defined by breadth-first traversal order. That is,
# the root is always 0, dependencies of the root are 1, 2, 3, etc., and so on.
#
# The three cases are:
#
# 1. Spec has no dependencies
# * We can avoid any traversal logic and just yield this node's _cmp_node generator.
#
# 2. Spec has dependencies, but dependencies have no dependencies.
# * We need to sort edges, but we don't need to track visited nodes, which
# can save us the cost of setting up all the tracking data structures
# `spack.traverse` uses.
#
# 3. Spec has dependencies that have dependencies.
# * In this case, the spec is *probably* concrete. Equality comparisons
# will be short-circuited by dag_hash(), but other comparisons will need
# to lazily enumerate components of the spec. The traversal logic is
# unavoidable.
#
# TODO: consider reworking `spack.traverse` to construct fewer data structures
# and objects, as this would make all traversals faster and could eliminate the
# need for the complexity here. It was not clear at the time of writing that how
# much optimization was possible in `spack.traverse`.
sorted_l1_edges = None
edge_list = None
node_ids = None
def nodes():
nonlocal sorted_l1_edges
nonlocal edge_list
nonlocal node_ids
# Level 0: root node
yield self._cmp_node # always yield the root (this node)
if not self._dependencies: # done if there are no dependencies
return
# Level 1: direct dependencies
# we can yield these in sorted order without tracking visited nodes
deps_have_deps = False
sorted_l1_edges = self.edges_to_dependencies(depflag=dt.ALL)
if len(sorted_l1_edges) > 1:
sorted_l1_edges = spack.traverse.sort_edges(sorted_l1_edges)
for edge in sorted_l1_edges:
yield edge.spec._cmp_node
if edge.spec._dependencies:
deps_have_deps = True
if not deps_have_deps: # done if level 1 specs have no dependencies
return
# Level 2: dependencies of direct dependencies
# now it's general; we need full traverse() to track visited nodes
l1_specs = [edge.spec for edge in sorted_l1_edges]
# the node_ids dict generates consistent ids based on BFS traversal order
# these are used to identify edges later
node_ids = collections.defaultdict(lambda: len(node_ids))
node_ids[id(self)] # self is 0
for spec in l1_specs:
node_ids[id(spec)] # l1 starts at 1
edge_list = []
for edge in spack.traverse.traverse_edges(
l1_specs, order="breadth", cover="edges", root=False, visited=set([0])
):
# yield each node only once, and generate a consistent id for it the
# first time it's encountered.
if id(edge.spec) not in node_ids:
yield edge.spec._cmp_node
node_ids[id(edge.spec)]
if edge.parent is None: # skip fake edge to root
continue
edge_list.append(
(
node_ids[id(edge.parent)],
node_ids[id(edge.spec)],
edge.depflag,
edge.virtuals,
)
)
def edges():
# no edges in single-node graph
if not self._dependencies:
return
# level 1 edges all start with zero
for i, edge in enumerate(sorted_l1_edges, start=1):
yield (0, i, edge.depflag, edge.virtuals)
# yield remaining edges in the order they were encountered during traversal
if edge_list:
yield from edge_list
yield nodes
yield edges
yield deps
@property
def namespace_if_anonymous(self):

View File

@@ -307,10 +307,56 @@ def test_pkg_hash(mock_packages):
assert len(output) == 1 and all(len(elt) == 32 for elt in output)
group_args = [
"/path/one.py", # 12
"/path/two.py", # 12
"/path/three.py", # 14
"/path/four.py", # 13
"/path/five.py", # 13
"/path/six.py", # 12
"/path/seven.py", # 14
"/path/eight.py", # 14
"/path/nine.py", # 13
"/path/ten.py", # 12
]
@pytest.mark.parametrize(
["max_group_size", "max_group_length", "lengths", "error"],
[
(3, 1, None, ValueError),
(3, 13, None, ValueError),
(3, 25, [2, 1, 1, 1, 1, 1, 1, 1, 1], None),
(3, 26, [2, 1, 1, 2, 1, 1, 2], None),
(3, 40, [3, 3, 2, 2], None),
(3, 43, [3, 3, 3, 1], None),
(4, 54, [4, 3, 3], None),
(4, 56, [4, 4, 2], None),
],
)
def test_group_arguments(mock_packages, max_group_size, max_group_length, lengths, error):
generator = spack.cmd.group_arguments(
group_args, max_group_size=max_group_size, max_group_length=max_group_length
)
# just check that error cases raise
if error:
with pytest.raises(ValueError):
list(generator)
return
groups = list(generator)
assert sum(groups, []) == group_args
assert [len(group) for group in groups] == lengths
assert all(
sum(len(elt) for elt in group) + (len(group) - 1) <= max_group_length for group in groups
)
@pytest.mark.skipif(not spack.cmd.pkg.get_grep(), reason="grep is not installed")
def test_pkg_grep(mock_packages, capfd):
# only splice-* mock packages have the string "splice" in them
pkg("grep", "-l", "splice", output=str)
pkg("grep", "-l", "splice")
output, _ = capfd.readouterr()
assert output.strip() == "\n".join(
spack.repo.PATH.get_pkg_class(name).module.__file__
@@ -330,12 +376,14 @@ def test_pkg_grep(mock_packages, capfd):
]
)
# ensure that this string isn't fouhnd
pkg("grep", "abcdefghijklmnopqrstuvwxyz", output=str, fail_on_error=False)
# ensure that this string isn't found
with pytest.raises(spack.main.SpackCommandError):
pkg("grep", "abcdefghijklmnopqrstuvwxyz")
assert pkg.returncode == 1
output, _ = capfd.readouterr()
assert output.strip() == ""
# ensure that we return > 1 for an error
pkg("grep", "--foobarbaz-not-an-option", output=str, fail_on_error=False)
with pytest.raises(spack.main.SpackCommandError):
pkg("grep", "--foobarbaz-not-an-option")
assert pkg.returncode == 2

View File

@@ -6,8 +6,6 @@
import pytest
import llnl.util.lang
import spack.concretize
import spack.deptypes as dt
import spack.directives
@@ -2015,137 +2013,6 @@ def test_comparison_multivalued_variants():
assert Spec("x=a") < Spec("x=a,b") < Spec("x==a,b") < Spec("x==a,b,c")
@pytest.mark.parametrize(
"specs_in_expected_order",
[
("a", "b", "c", "d", "e"),
("a@1.0", "a@2.0", "b", "c@3.0", "c@4.0"),
("a^d", "b^c", "c^b", "d^a"),
("e^a", "e^b", "e^c", "e^d"),
("e^a@1.0", "e^a@2.0", "e^a@3.0", "e^a@4.0"),
("e^a@1.0 +a", "e^a@1.0 +b", "e^a@1.0 +c", "e^a@1.0 +c"),
("a^b%c", "a^b%d", "a^b%e", "a^b%f"),
("a^b%c@1.0", "a^b%c@2.0", "a^b%c@3.0", "a^b%c@4.0"),
("a^b%c@1.0 +a", "a^b%c@1.0 +b", "a^b%c@1.0 +c", "a^b%c@1.0 +d"),
("a cflags=-O1", "a cflags=-O2", "a cflags=-O3"),
("a %cmake@1.0 ^b %cmake@2.0", "a %cmake@2.0 ^b %cmake@1.0"),
("a^b^c^d", "a^b^c^e", "a^b^c^f"),
("a^b^c^d", "a^b^c^e", "a^b^c^e", "a^b^c^f"),
("a%b%c%d", "a%b%c%e", "a%b%c%e", "a%b%c%f"),
("d.a", "c.b", "b.c", "a.d"), # names before namespaces
],
)
def test_spec_ordering(specs_in_expected_order):
specs_in_expected_order = [Spec(s) for s in specs_in_expected_order]
assert sorted(specs_in_expected_order) == specs_in_expected_order
assert sorted(reversed(specs_in_expected_order)) == specs_in_expected_order
for i in range(len(specs_in_expected_order) - 1):
lhs, rhs = specs_in_expected_order[i : i + 2]
assert lhs <= rhs
assert (lhs < rhs and lhs != rhs) or lhs == rhs
assert rhs >= lhs
assert (rhs > lhs and rhs != lhs) or rhs == lhs
EMPTY_VER = vn.VersionList(":")
EMPTY_VAR = Spec().variants
EMPTY_FLG = Spec().compiler_flags
@pytest.mark.parametrize(
"spec,expected_tuplified",
[
# simple, no dependencies
[("a"), ((("a", None, EMPTY_VER, EMPTY_VAR, EMPTY_FLG, None, None, None),), ())],
# with some node attributes
[
("a@1.0 +foo cflags='-O3 -g'"),
(
(
(
"a",
None,
vn.VersionList(["1.0"]),
Spec("+foo").variants,
Spec("cflags='-O3 -g'").compiler_flags,
None,
None,
None,
),
),
(),
),
],
# single edge case
[
("a^b"),
(
(
("a", None, EMPTY_VER, EMPTY_VAR, EMPTY_FLG, None, None, None),
("b", None, EMPTY_VER, EMPTY_VAR, EMPTY_FLG, None, None, None),
),
((0, 1, 0, ()),),
),
],
# root with multiple deps
[
("a^b^c^d"),
(
(
("a", None, EMPTY_VER, EMPTY_VAR, EMPTY_FLG, None, None, None),
("b", None, EMPTY_VER, EMPTY_VAR, EMPTY_FLG, None, None, None),
("c", None, EMPTY_VER, EMPTY_VAR, EMPTY_FLG, None, None, None),
("d", None, EMPTY_VER, EMPTY_VAR, EMPTY_FLG, None, None, None),
),
((0, 1, 0, ()), (0, 2, 0, ()), (0, 3, 0, ())),
),
],
# root with multiple build deps
[
("a%b%c%d"),
(
(
("a", None, EMPTY_VER, EMPTY_VAR, EMPTY_FLG, None, None, None),
("b", None, EMPTY_VER, EMPTY_VAR, EMPTY_FLG, None, None, None),
("c", None, EMPTY_VER, EMPTY_VAR, EMPTY_FLG, None, None, None),
("d", None, EMPTY_VER, EMPTY_VAR, EMPTY_FLG, None, None, None),
),
((0, 1, dt.BUILD, ()), (0, 2, dt.BUILD, ()), (0, 3, dt.BUILD, ())),
),
],
# dependencies with dependencies
[
("a ^b %c %d ^e %f %g"),
(
(
("a", None, EMPTY_VER, EMPTY_VAR, EMPTY_FLG, None, None, None),
("b", None, EMPTY_VER, EMPTY_VAR, EMPTY_FLG, None, None, None),
("e", None, EMPTY_VER, EMPTY_VAR, EMPTY_FLG, None, None, None),
("c", None, EMPTY_VER, EMPTY_VAR, EMPTY_FLG, None, None, None),
("d", None, EMPTY_VER, EMPTY_VAR, EMPTY_FLG, None, None, None),
("f", None, EMPTY_VER, EMPTY_VAR, EMPTY_FLG, None, None, None),
("g", None, EMPTY_VER, EMPTY_VAR, EMPTY_FLG, None, None, None),
),
(
(0, 1, 0, ()),
(0, 2, 0, ()),
(1, 3, dt.BUILD, ()),
(1, 4, dt.BUILD, ()),
(2, 5, dt.BUILD, ()),
(2, 6, dt.BUILD, ()),
),
),
],
],
)
def test_spec_canonical_comparison_form(spec, expected_tuplified):
print()
print()
print()
assert llnl.util.lang.tuplify(Spec(spec)._cmp_iter) == expected_tuplified
def test_comparison_after_breaking_hash_change():
# We simulate a breaking change in DAG hash computation in Spack. We have two specs that are
# entirely equal modulo DAG hash. When deserializing these specs, we don't want them to compare

View File

@@ -295,11 +295,11 @@ def streamify(arg, mode):
raise ProcessError("%s: %s" % (self.exe[0], e.strerror), message)
except subprocess.CalledProcessError as e:
self.returncode = e.returncode
if fail_on_error:
raise ProcessError(
str(e),
"\nExit status %d when invoking command: %s"
% (proc.returncode, cmd_line_string),
f"\nExit status {e.returncode} when invoking command: {cmd_line_string}",
)
except subprocess.TimeoutExpired as te:
proc.kill()