spack buildcache push: best effort (#45631)

"spack buildcache push" for partially installed environments pushes all it 
can by default, and only dumps errors towards the end.

If --fail-fast is provided, error out before pushing anything if any
of the packages is uninstalled

oci build caches using parallel push now use futures to ensure pushing
goes in best-effort style.
This commit is contained in:
Harmen Stoppels 2024-08-13 08:12:48 +02:00 committed by GitHub
parent 6b73f00310
commit a66586d749
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 334 additions and 174 deletions

View File

@ -206,6 +206,7 @@ def setup(sphinx):
("py:class", "six.moves.urllib.parse.ParseResult"),
("py:class", "TextIO"),
("py:class", "hashlib._Hash"),
("py:class", "concurrent.futures._base.Executor"),
# Spack classes that are private and we don't want to expose
("py:class", "spack.provider_index._IndexBase"),
("py:class", "spack.repo._PrependFileLoader"),

View File

@ -22,7 +22,7 @@
import urllib.request
import warnings
from contextlib import closing
from typing import Dict, Iterable, List, NamedTuple, Optional, Set, Tuple
from typing import Dict, Iterable, NamedTuple, Optional, Set, Tuple
import llnl.util.filesystem as fsys
import llnl.util.lang
@ -46,7 +46,6 @@
import spack.repo
import spack.stage
import spack.store
import spack.traverse as traverse
import spack.util.archive
import spack.util.crypto
import spack.util.file_cache as file_cache
@ -1201,59 +1200,6 @@ def _build_tarball_in_stage_dir(spec: Spec, out_url: str, stage_dir: str, option
generate_package_index(url_util.join(out_url, os.path.relpath(cache_prefix, stage_dir)))
class NotInstalledError(spack.error.SpackError):
"""Raised when a spec is not installed but picked to be packaged."""
def __init__(self, specs: List[Spec]):
super().__init__(
"Cannot push non-installed packages",
", ".join(s.cformat("{name}{@version}{/hash:7}") for s in specs),
)
def specs_to_be_packaged(
specs: List[Spec], root: bool = True, dependencies: bool = True
) -> List[Spec]:
"""Return the list of nodes to be packaged, given a list of specs.
Raises NotInstalledError if a spec is not installed but picked to be packaged.
Args:
specs: list of root specs to be processed
root: include the root of each spec in the nodes
dependencies: include the dependencies of each
spec in the nodes
"""
if not root and not dependencies:
return []
# Filter packageable roots
with spack.store.STORE.db.read_transaction():
if root:
# Error on uninstalled roots, when roots are requested
uninstalled_roots = list(s for s in specs if not s.installed)
if uninstalled_roots:
raise NotInstalledError(uninstalled_roots)
roots = specs
else:
roots = []
if dependencies:
# Error on uninstalled deps, when deps are requested
deps = list(
traverse.traverse_nodes(
specs, deptype="all", order="breadth", root=False, key=traverse.by_dag_hash
)
)
uninstalled_deps = list(s for s in deps if not s.installed)
if uninstalled_deps:
raise NotInstalledError(uninstalled_deps)
else:
deps = []
return [s for s in itertools.chain(roots, deps) if not s.external]
def try_verify(specfile_path):
"""Utility function to attempt to verify a local file. Assumes the
file is a clearsigned signature file.

View File

@ -3,25 +3,25 @@
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import argparse
import concurrent.futures
import copy
import glob
import hashlib
import json
import multiprocessing
import multiprocessing.pool
import os
import shutil
import sys
import tempfile
from typing import Dict, List, Optional, Tuple, Union
from typing import Dict, List, Optional, Tuple
import llnl.util.tty as tty
from llnl.string import plural
from llnl.util.lang import elide_list
from llnl.util.lang import elide_list, stable_partition
import spack.binary_distribution as bindist
import spack.cmd
import spack.config
import spack.deptypes as dt
import spack.environment as ev
import spack.error
import spack.hash_types as ht
@ -35,10 +35,10 @@
import spack.store
import spack.user_environment
import spack.util.crypto
import spack.util.parallel
import spack.util.url as url_util
import spack.util.web as web_util
from spack import traverse
from spack.build_environment import determine_number_of_jobs
from spack.cmd import display_specs
from spack.cmd.common import arguments
from spack.oci.image import (
@ -112,6 +112,17 @@ def setup_parser(subparser: argparse.ArgumentParser):
"Alternatively, one can decide to build a cache for only the package or only the "
"dependencies",
)
with_or_without_build_deps = push.add_mutually_exclusive_group()
with_or_without_build_deps.add_argument(
"--with-build-dependencies",
action="store_true",
help="include build dependencies in the buildcache",
)
with_or_without_build_deps.add_argument(
"--without-build-dependencies",
action="store_true",
help="exclude build dependencies from the buildcache",
)
push.add_argument(
"--fail-fast",
action="store_true",
@ -336,32 +347,6 @@ def _progress(i: int, total: int):
return ""
class NoPool:
def map(self, func, args):
return [func(a) for a in args]
def starmap(self, func, args):
return [func(*a) for a in args]
def __enter__(self):
return self
def __exit__(self, *args):
pass
MaybePool = Union[multiprocessing.pool.Pool, NoPool]
def _make_pool() -> MaybePool:
"""Can't use threading because it's unsafe, and can't use spawned processes because of globals.
That leaves only forking"""
if multiprocessing.get_start_method() == "fork":
return multiprocessing.pool.Pool(determine_number_of_jobs(parallel=True))
else:
return NoPool()
def _skip_no_redistribute_for_public(specs):
remaining_specs = list()
removed_specs = list()
@ -381,6 +366,47 @@ def _skip_no_redistribute_for_public(specs):
return remaining_specs
class PackagesAreNotInstalledError(spack.error.SpackError):
"""Raised when a list of specs is not installed but picked to be packaged."""
def __init__(self, specs: List[Spec]):
super().__init__(
"Cannot push non-installed packages",
", ".join(elide_list(list(_format_spec(s) for s in specs), 5)),
)
class PackageNotInstalledError(spack.error.SpackError):
"""Raised when a spec is not installed but picked to be packaged."""
class MissingLayerError(spack.error.SpackError):
"""Raised when a required layer for a dependency is missing in an OCI registry."""
def _specs_to_be_packaged(
requested: List[Spec], things_to_install: str, build_deps: bool
) -> List[Spec]:
"""Collect all non-external with or without roots and dependencies"""
if "dependencies" not in things_to_install:
deptype = dt.NONE
elif build_deps:
deptype = dt.ALL
else:
deptype = dt.RUN | dt.LINK | dt.TEST
return [
s
for s in traverse.traverse_nodes(
requested,
root="package" in things_to_install,
deptype=deptype,
order="breadth",
key=traverse.by_dag_hash,
)
if not s.external
]
def push_fn(args):
"""create a binary package and push it to a mirror"""
if args.spec_file:
@ -420,40 +446,52 @@ def push_fn(args):
"Use --unsigned to silence this warning."
)
# This is a list of installed, non-external specs.
specs = bindist.specs_to_be_packaged(
specs = _specs_to_be_packaged(
roots,
root="package" in args.things_to_install,
dependencies="dependencies" in args.things_to_install,
things_to_install=args.things_to_install,
build_deps=args.with_build_dependencies or not args.without_build_dependencies,
)
if not args.private:
specs = _skip_no_redistribute_for_public(specs)
# When pushing multiple specs, print the url once ahead of time, as well as how
# many specs are being pushed.
if len(specs) > 1:
tty.info(f"Selected {len(specs)} specs to push to {push_url}")
failed = []
# Pushing not installed specs is an error. Either fail fast or populate the error list and
# push installed package in best effort mode.
failed: List[Tuple[Spec, BaseException]] = []
with spack.store.STORE.db.read_transaction():
if any(not s.installed for s in specs):
specs, not_installed = stable_partition(specs, lambda s: s.installed)
if args.fail_fast:
raise PackagesAreNotInstalledError(not_installed)
else:
failed.extend(
(s, PackageNotInstalledError("package not installed")) for s in not_installed
)
# TODO: unify this logic in the future.
# TODO: move into bindist.push_or_raise
if target_image:
base_image = ImageReference.from_string(args.base_image) if args.base_image else None
with tempfile.TemporaryDirectory(
dir=spack.stage.get_stage_root()
) as tmpdir, _make_pool() as pool:
skipped, base_images, checksums = _push_oci(
) as tmpdir, spack.util.parallel.make_concurrent_executor() as executor:
skipped, base_images, checksums, upload_errors = _push_oci(
target_image=target_image,
base_image=base_image,
installed_specs_with_deps=specs,
force=args.force,
tmpdir=tmpdir,
pool=pool,
executor=executor,
)
if upload_errors:
failed.extend(upload_errors)
# Apart from creating manifests for each individual spec, we allow users to create a
# separate image tag for all root specs and their runtime dependencies.
if args.tag:
elif args.tag:
tagged_image = target_image.with_tag(args.tag)
# _push_oci may not populate base_images if binaries were already in the registry
for spec in roots:
@ -496,7 +534,7 @@ def push_fn(args):
e, (bindist.PickKeyException, bindist.NoKeyException)
):
raise
failed.append((_format_spec(spec), e))
failed.append((spec, e))
if skipped:
if len(specs) == 1:
@ -519,7 +557,13 @@ def push_fn(args):
raise spack.error.SpackError(
f"The following {len(failed)} errors occurred while pushing specs to the buildcache",
"\n".join(
elide_list([f" {spec}: {e.__class__.__name__}: {e}" for spec, e in failed], 5)
elide_list(
[
f" {_format_spec(spec)}: {e.__class__.__name__}: {e}"
for spec, e in failed
],
5,
)
),
)
@ -529,8 +573,8 @@ def push_fn(args):
if target_image and len(skipped) < len(specs) and args.update_index:
with tempfile.TemporaryDirectory(
dir=spack.stage.get_stage_root()
) as tmpdir, _make_pool() as pool:
_update_index_oci(target_image, tmpdir, pool)
) as tmpdir, spack.util.parallel.make_concurrent_executor() as executor:
_update_index_oci(target_image, tmpdir, executor)
def _get_spack_binary_blob(image_ref: ImageReference) -> Optional[spack.oci.oci.Blob]:
@ -604,17 +648,12 @@ def _put_manifest(
):
architecture = _archspec_to_gooarch(specs[0])
dependencies = list(
reversed(
list(
s
for s in traverse.traverse_nodes(
specs, order="topo", deptype=("link", "run"), root=True
)
if not s.external
)
)
)
expected_blobs: List[Spec] = [
s
for s in traverse.traverse_nodes(specs, order="topo", deptype=("link", "run"), root=True)
if not s.external
]
expected_blobs.reverse()
base_manifest, base_config = base_images[architecture]
env = _retrieve_env_dict_from_config(base_config)
@ -633,9 +672,16 @@ def _put_manifest(
# Create an oci.image.config file
config = copy.deepcopy(base_config)
# Add the diff ids of the dependencies
for s in dependencies:
config["rootfs"]["diff_ids"].append(str(checksums[s.dag_hash()].uncompressed_digest))
# Add the diff ids of the blobs
for s in expected_blobs:
# If a layer for a dependency has gone missing (due to removed manifest in the registry, a
# failed push, or a local forced uninstall), we cannot create a runnable container image.
# If an OCI registry is only used for storage, this is not a hard error, but for now we
# raise an exception unconditionally, until someone requests a more lenient behavior.
checksum = checksums.get(s.dag_hash())
if not checksum:
raise MissingLayerError(f"missing layer for {_format_spec(s)}")
config["rootfs"]["diff_ids"].append(str(checksum.uncompressed_digest))
# Set the environment variables
config["config"]["Env"] = [f"{k}={v}" for k, v in env.items()]
@ -679,7 +725,7 @@ def _put_manifest(
"digest": str(checksums[s.dag_hash()].compressed_digest),
"size": checksums[s.dag_hash()].size,
}
for s in dependencies
for s in expected_blobs
),
],
}
@ -724,9 +770,14 @@ def _push_oci(
base_image: Optional[ImageReference],
installed_specs_with_deps: List[Spec],
tmpdir: str,
pool: MaybePool,
executor: concurrent.futures.Executor,
force: bool = False,
) -> Tuple[List[str], Dict[str, Tuple[dict, dict]], Dict[str, spack.oci.oci.Blob]]:
) -> Tuple[
List[str],
Dict[str, Tuple[dict, dict]],
Dict[str, spack.oci.oci.Blob],
List[Tuple[Spec, BaseException]],
]:
"""Push specs to an OCI registry
Args:
@ -739,7 +790,8 @@ def _push_oci(
Returns:
A tuple consisting of the list of skipped specs already in the build cache,
a dictionary mapping architectures to base image manifests and configs,
and a dictionary mapping each spec's dag hash to a blob.
a dictionary mapping each spec's dag hash to a blob,
and a list of tuples of specs with errors of failed uploads.
"""
# Reverse the order
@ -756,39 +808,50 @@ def _push_oci(
if not force:
tty.info("Checking for existing specs in the buildcache")
to_be_uploaded = []
blobs_to_upload = []
tags_to_check = (target_image.with_tag(default_tag(s)) for s in installed_specs_with_deps)
available_blobs = pool.map(_get_spack_binary_blob, tags_to_check)
available_blobs = executor.map(_get_spack_binary_blob, tags_to_check)
for spec, maybe_blob in zip(installed_specs_with_deps, available_blobs):
if maybe_blob is not None:
checksums[spec.dag_hash()] = maybe_blob
skipped.append(_format_spec(spec))
else:
to_be_uploaded.append(spec)
blobs_to_upload.append(spec)
else:
to_be_uploaded = installed_specs_with_deps
blobs_to_upload = installed_specs_with_deps
if not to_be_uploaded:
return skipped, base_images, checksums
if not blobs_to_upload:
return skipped, base_images, checksums, []
tty.info(
f"{len(to_be_uploaded)} specs need to be pushed to "
f"{len(blobs_to_upload)} specs need to be pushed to "
f"{target_image.domain}/{target_image.name}"
)
# Upload blobs
new_blobs = pool.starmap(
_push_single_spack_binary_blob, ((target_image, spec, tmpdir) for spec in to_be_uploaded)
)
blob_futures = [
executor.submit(_push_single_spack_binary_blob, target_image, spec, tmpdir)
for spec in blobs_to_upload
]
# And update the spec to blob mapping
for spec, blob in zip(to_be_uploaded, new_blobs):
checksums[spec.dag_hash()] = blob
concurrent.futures.wait(blob_futures)
manifests_to_upload: List[Spec] = []
errors: List[Tuple[Spec, BaseException]] = []
# And update the spec to blob mapping for successful uploads
for spec, blob_future in zip(blobs_to_upload, blob_futures):
error = blob_future.exception()
if error is None:
manifests_to_upload.append(spec)
checksums[spec.dag_hash()] = blob_future.result()
else:
errors.append((spec, error))
# Copy base images if necessary
for spec in to_be_uploaded:
for spec in manifests_to_upload:
_update_base_images(
base_image=base_image,
target_image=target_image,
@ -807,30 +870,35 @@ def extra_config(spec: Spec):
# Upload manifests
tty.info("Uploading manifests")
pool.starmap(
_put_manifest,
(
(
base_images,
checksums,
target_image.with_tag(default_tag(spec)),
tmpdir,
extra_config(spec),
{"org.opencontainers.image.description": spec.format()},
spec,
)
for spec in to_be_uploaded
),
)
manifest_futures = [
executor.submit(
_put_manifest,
base_images,
checksums,
target_image.with_tag(default_tag(spec)),
tmpdir,
extra_config(spec),
{"org.opencontainers.image.description": spec.format()},
spec,
)
for spec in manifests_to_upload
]
concurrent.futures.wait(manifest_futures)
# Print the image names of the top-level specs
for spec in to_be_uploaded:
tty.info(f"Pushed {_format_spec(spec)} to {target_image.with_tag(default_tag(spec))}")
for spec, manifest_future in zip(manifests_to_upload, manifest_futures):
error = manifest_future.exception()
if error is None:
tty.info(f"Pushed {_format_spec(spec)} to {target_image.with_tag(default_tag(spec))}")
else:
errors.append((spec, error))
return skipped, base_images, checksums
return skipped, base_images, checksums, errors
def _config_from_tag(image_ref: ImageReference, tag: str) -> Optional[dict]:
def _config_from_tag(image_ref_and_tag: Tuple[ImageReference, str]) -> Optional[dict]:
image_ref, tag = image_ref_and_tag
# Don't allow recursion here, since Spack itself always uploads
# vnd.oci.image.manifest.v1+json, not vnd.oci.image.index.v1+json
_, config = get_manifest_and_config_with_retry(image_ref.with_tag(tag), tag, recurse=0)
@ -840,13 +908,13 @@ def _config_from_tag(image_ref: ImageReference, tag: str) -> Optional[dict]:
return config if "spec" in config else None
def _update_index_oci(image_ref: ImageReference, tmpdir: str, pool: MaybePool) -> None:
def _update_index_oci(
image_ref: ImageReference, tmpdir: str, pool: concurrent.futures.Executor
) -> None:
tags = list_tags(image_ref)
# Fetch all image config files in parallel
spec_dicts = pool.starmap(
_config_from_tag, ((image_ref, tag) for tag in tags if tag_is_spec(tag))
)
spec_dicts = pool.map(_config_from_tag, ((image_ref, tag) for tag in tags if tag_is_spec(tag)))
# Populate the database
db_root_dir = os.path.join(tmpdir, "db_root")
@ -1182,8 +1250,8 @@ def update_index(mirror: spack.mirror.Mirror, update_keys=False):
if image_ref:
with tempfile.TemporaryDirectory(
dir=spack.stage.get_stage_root()
) as tmpdir, _make_pool() as pool:
_update_index_oci(image_ref, tmpdir, pool)
) as tmpdir, spack.util.parallel.make_concurrent_executor() as executor:
_update_index_oci(image_ref, tmpdir, executor)
return
# Otherwise, assume a normal mirror.

View File

@ -12,7 +12,9 @@
import spack.binary_distribution
import spack.cmd.buildcache
import spack.deptypes
import spack.environment as ev
import spack.error
import spack.main
import spack.spec
import spack.util.url
@ -443,3 +445,54 @@ def test_skip_no_redistribute(mock_packages, config):
filtered = spack.cmd.buildcache._skip_no_redistribute_for_public(specs)
assert not any(s.name == "no-redistribute" for s in filtered)
assert any(s.name == "no-redistribute-dependent" for s in filtered)
def test_best_effort_vs_fail_fast_when_dep_not_installed(tmp_path, mutable_database):
"""When --fail-fast is passed, the push command should fail if it immediately finds an
uninstalled dependency. Otherwise, failure to push one dependency shouldn't prevent the
others from being pushed."""
mirror("add", "--unsigned", "my-mirror", str(tmp_path))
# Uninstall mpich so that its dependent mpileaks can't be pushed
for s in mutable_database.query_local("mpich"):
s.package.do_uninstall(force=True)
with pytest.raises(spack.cmd.buildcache.PackagesAreNotInstalledError, match="mpich"):
buildcache("push", "--update-index", "--fail-fast", "my-mirror", "mpileaks^mpich")
# nothing should be pushed due to --fail-fast.
assert not os.listdir(tmp_path)
assert not spack.binary_distribution.update_cache_and_get_specs()
with pytest.raises(spack.cmd.buildcache.PackageNotInstalledError):
buildcache("push", "--update-index", "my-mirror", "mpileaks^mpich")
specs = spack.binary_distribution.update_cache_and_get_specs()
# everything but mpich should be pushed
mpileaks = mutable_database.query_local("mpileaks^mpich")[0]
assert set(specs) == {s for s in mpileaks.traverse() if s.name != "mpich"}
def test_push_without_build_deps(tmp_path, temporary_store, mock_packages, mutable_config):
"""Spack should not error when build deps are uninstalled and --without-build-dependenies is
passed."""
mirror("add", "--unsigned", "my-mirror", str(tmp_path))
s = spack.spec.Spec("dtrun3").concretized()
s.package.do_install(fake=True)
s["dtbuild3"].package.do_uninstall()
# fails when build deps are required
with pytest.raises(spack.error.SpackError, match="package not installed"):
buildcache(
"push", "--update-index", "--with-build-dependencies", "my-mirror", f"/{s.dag_hash()}"
)
# succeeds when build deps are not required
buildcache(
"push", "--update-index", "--without-build-dependencies", "my-mirror", f"/{s.dag_hash()}"
)
assert spack.binary_distribution.update_cache_and_get_specs() == [s]

View File

@ -56,6 +56,7 @@
import spack.util.executable
import spack.util.git
import spack.util.gpg
import spack.util.parallel
import spack.util.spack_yaml as syaml
import spack.util.url as url_util
import spack.version
@ -1961,10 +1962,12 @@ def pytest_runtest_setup(item):
pytest.skip(*not_on_windows_marker.args)
@pytest.fixture(scope="function")
@pytest.fixture(autouse=True)
def disable_parallel_buildcache_push(monkeypatch):
"""Disable process pools in tests."""
monkeypatch.setattr(spack.cmd.buildcache, "_make_pool", spack.cmd.buildcache.NoPool)
monkeypatch.setattr(
spack.util.parallel, "make_concurrent_executor", spack.util.parallel.SequentialExecutor
)
def _root_path(x, y, *, path):

View File

@ -10,12 +10,19 @@
import json
import os
import pathlib
import re
from contextlib import contextmanager
import pytest
import spack.cmd.buildcache
import spack.database
import spack.environment as ev
import spack.error
import spack.oci.opener
import spack.spec
from spack.main import SpackCommand
from spack.oci.image import Digest, ImageReference, default_config, default_manifest
from spack.oci.image import Digest, ImageReference, default_config, default_manifest, default_tag
from spack.oci.oci import blob_exists, get_manifest_and_config, upload_blob, upload_manifest
from spack.test.oci.mock_registry import DummyServer, InMemoryOCIRegistry, create_opener
from spack.util.archive import gzip_compressed_tarfile
@ -34,7 +41,7 @@ def oci_servers(*servers: DummyServer):
spack.oci.opener.urlopen = old_opener
def test_buildcache_push_command(mutable_database, disable_parallel_buildcache_push):
def test_buildcache_push_command(mutable_database):
with oci_servers(InMemoryOCIRegistry("example.com")):
mirror("add", "oci-test", "oci://example.com/image")
@ -57,9 +64,7 @@ def test_buildcache_push_command(mutable_database, disable_parallel_buildcache_p
assert os.path.exists(os.path.join(spec.prefix, "bin", "mpileaks"))
def test_buildcache_tag(
install_mockery, mock_fetch, mutable_mock_env_path, disable_parallel_buildcache_push
):
def test_buildcache_tag(install_mockery, mock_fetch, mutable_mock_env_path):
"""Tests whether we can create an OCI image from a full environment with multiple roots."""
env("create", "test")
with ev.read("test"):
@ -97,9 +102,7 @@ def test_buildcache_tag(
assert len(manifest["layers"]) == 1
def test_buildcache_push_with_base_image_command(
mutable_database, tmpdir, disable_parallel_buildcache_push
):
def test_buildcache_push_with_base_image_command(mutable_database, tmpdir):
"""Test that we can push a package with a base image to an OCI registry.
This test is a bit involved, cause we have to create a small base image."""
@ -200,7 +203,7 @@ def test_buildcache_push_with_base_image_command(
def test_uploading_with_base_image_in_docker_image_manifest_v2_format(
tmp_path: pathlib.Path, mutable_database, disable_parallel_buildcache_push
tmp_path: pathlib.Path, mutable_database
):
"""If the base image uses an old manifest schema, Spack should also use that.
That is necessary for container images to work with Apptainer, which is rather strict about
@ -286,3 +289,57 @@ def test_uploading_with_base_image_in_docker_image_manifest_v2_format(
for layer in m["layers"]:
assert layer["mediaType"] == "application/vnd.docker.image.rootfs.diff.tar.gzip"
assert "annotations" not in m
def test_best_effort_upload(mutable_database: spack.database.Database, monkeypatch):
"""Failure to upload a blob or manifest should not prevent others from being uploaded"""
_push_blob = spack.cmd.buildcache._push_single_spack_binary_blob
_push_manifest = spack.cmd.buildcache._put_manifest
def push_blob(image_ref, spec, tmpdir):
# fail to upload the blob of mpich
if spec.name == "mpich":
raise Exception("Blob Server Error")
return _push_blob(image_ref, spec, tmpdir)
def put_manifest(base_images, checksums, image_ref, tmpdir, extra_config, annotations, *specs):
# fail to upload the manifest of libdwarf
if "libdwarf" in (s.name for s in specs):
raise Exception("Manifest Server Error")
return _push_manifest(
base_images, checksums, image_ref, tmpdir, extra_config, annotations, *specs
)
monkeypatch.setattr(spack.cmd.buildcache, "_push_single_spack_binary_blob", push_blob)
monkeypatch.setattr(spack.cmd.buildcache, "_put_manifest", put_manifest)
registry = InMemoryOCIRegistry("example.com")
with oci_servers(registry):
mirror("add", "oci-test", "oci://example.com/image")
with pytest.raises(spack.error.SpackError, match="The following 4 errors occurred") as e:
buildcache("push", "--update-index", "oci-test", "mpileaks^mpich")
error = str(e.value)
# mpich's blob failed to upload
assert re.search("mpich.+: Exception: Blob Server Error", error)
# libdwarf's manifest failed to upload
assert re.search("libdwarf.+: Exception: Manifest Server Error", error)
# since there is no blob for mpich, runtime dependents cannot refer to it in their
# manifests, which is a transitive error.
assert re.search("callpath.+: MissingLayerError: missing layer for mpich", error)
assert re.search("mpileaks.+: MissingLayerError: missing layer for mpich", error)
mpileaks: spack.spec.Spec = mutable_database.query_local("mpileaks^mpich")[0]
# ensure that packages not affected by errors were uploaded still.
uploaded_tags = {tag for _, tag in registry.manifests.keys()}
failures = {"mpich", "libdwarf", "callpath", "mpileaks"}
expected_tags = {default_tag(s) for s in mpileaks.traverse() if s.name not in failures}
assert expected_tags
assert uploaded_tags == expected_tags

View File

@ -2,12 +2,15 @@
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import concurrent.futures
import multiprocessing
import os
import sys
import traceback
from typing import Optional
from spack.util.cpus import determine_number_of_jobs
class ErrorFromWorker:
"""Wrapper class to report an error from a worker process"""
@ -80,3 +83,24 @@ def imap_unordered(
if isinstance(result, ErrorFromWorker):
raise RuntimeError(result.stacktrace if debug else str(result))
yield result
class SequentialExecutor(concurrent.futures.Executor):
"""Executor that runs tasks sequentially in the current thread."""
def submit(self, fn, *args, **kwargs):
"""Submit a function to be executed."""
future = concurrent.futures.Future()
try:
future.set_result(fn(*args, **kwargs))
except Exception as e:
future.set_exception(e)
return future
def make_concurrent_executor() -> concurrent.futures.Executor:
"""Can't use threading because it's unsafe, and can't use spawned processes because of globals.
That leaves only forking."""
if multiprocessing.get_start_method() == "fork":
return concurrent.futures.ProcessPoolExecutor(determine_number_of_jobs(parallel=True))
return SequentialExecutor()

View File

@ -571,7 +571,7 @@ _spack_buildcache() {
_spack_buildcache_push() {
if $list_options
then
SPACK_COMPREPLY="-h --help -f --force --unsigned -u --signed --key -k --update-index --rebuild-index --spec-file --only --fail-fast --base-image --tag -t --private -j --jobs"
SPACK_COMPREPLY="-h --help -f --force --unsigned -u --signed --key -k --update-index --rebuild-index --spec-file --only --with-build-dependencies --without-build-dependencies --fail-fast --base-image --tag -t --private -j --jobs"
else
_mirrors
fi
@ -580,7 +580,7 @@ _spack_buildcache_push() {
_spack_buildcache_create() {
if $list_options
then
SPACK_COMPREPLY="-h --help -f --force --unsigned -u --signed --key -k --update-index --rebuild-index --spec-file --only --fail-fast --base-image --tag -t --private -j --jobs"
SPACK_COMPREPLY="-h --help -f --force --unsigned -u --signed --key -k --update-index --rebuild-index --spec-file --only --with-build-dependencies --without-build-dependencies --fail-fast --base-image --tag -t --private -j --jobs"
else
_mirrors
fi

View File

@ -699,7 +699,7 @@ complete -c spack -n '__fish_spack_using_command buildcache' -s h -l help -f -a
complete -c spack -n '__fish_spack_using_command buildcache' -s h -l help -d 'show this help message and exit'
# spack buildcache push
set -g __fish_spack_optspecs_spack_buildcache_push h/help f/force u/unsigned signed k/key= update-index spec-file= only= fail-fast base-image= t/tag= private j/jobs=
set -g __fish_spack_optspecs_spack_buildcache_push h/help f/force u/unsigned signed k/key= update-index spec-file= only= with-build-dependencies without-build-dependencies fail-fast base-image= t/tag= private j/jobs=
complete -c spack -n '__fish_spack_using_command_pos_remainder 1 buildcache push' -f -k -a '(__fish_spack_specs)'
complete -c spack -n '__fish_spack_using_command buildcache push' -s h -l help -f -a help
complete -c spack -n '__fish_spack_using_command buildcache push' -s h -l help -d 'show this help message and exit'
@ -717,6 +717,10 @@ complete -c spack -n '__fish_spack_using_command buildcache push' -l spec-file -
complete -c spack -n '__fish_spack_using_command buildcache push' -l spec-file -r -d 'create buildcache entry for spec from json or yaml file'
complete -c spack -n '__fish_spack_using_command buildcache push' -l only -r -f -a 'package dependencies'
complete -c spack -n '__fish_spack_using_command buildcache push' -l only -r -d 'select the buildcache mode. The default is to build a cache for the package along with all its dependencies. Alternatively, one can decide to build a cache for only the package or only the dependencies'
complete -c spack -n '__fish_spack_using_command buildcache push' -l with-build-dependencies -f -a with_build_dependencies
complete -c spack -n '__fish_spack_using_command buildcache push' -l with-build-dependencies -d 'include build dependencies in the buildcache'
complete -c spack -n '__fish_spack_using_command buildcache push' -l without-build-dependencies -f -a without_build_dependencies
complete -c spack -n '__fish_spack_using_command buildcache push' -l without-build-dependencies -d 'exclude build dependencies from the buildcache'
complete -c spack -n '__fish_spack_using_command buildcache push' -l fail-fast -f -a fail_fast
complete -c spack -n '__fish_spack_using_command buildcache push' -l fail-fast -d 'stop pushing on first failure (default is best effort)'
complete -c spack -n '__fish_spack_using_command buildcache push' -l base-image -r -f -a base_image
@ -729,7 +733,7 @@ complete -c spack -n '__fish_spack_using_command buildcache push' -s j -l jobs -
complete -c spack -n '__fish_spack_using_command buildcache push' -s j -l jobs -r -d 'explicitly set number of parallel jobs'
# spack buildcache create
set -g __fish_spack_optspecs_spack_buildcache_create h/help f/force u/unsigned signed k/key= update-index spec-file= only= fail-fast base-image= t/tag= private j/jobs=
set -g __fish_spack_optspecs_spack_buildcache_create h/help f/force u/unsigned signed k/key= update-index spec-file= only= with-build-dependencies without-build-dependencies fail-fast base-image= t/tag= private j/jobs=
complete -c spack -n '__fish_spack_using_command_pos_remainder 1 buildcache create' -f -k -a '(__fish_spack_specs)'
complete -c spack -n '__fish_spack_using_command buildcache create' -s h -l help -f -a help
complete -c spack -n '__fish_spack_using_command buildcache create' -s h -l help -d 'show this help message and exit'
@ -747,6 +751,10 @@ complete -c spack -n '__fish_spack_using_command buildcache create' -l spec-file
complete -c spack -n '__fish_spack_using_command buildcache create' -l spec-file -r -d 'create buildcache entry for spec from json or yaml file'
complete -c spack -n '__fish_spack_using_command buildcache create' -l only -r -f -a 'package dependencies'
complete -c spack -n '__fish_spack_using_command buildcache create' -l only -r -d 'select the buildcache mode. The default is to build a cache for the package along with all its dependencies. Alternatively, one can decide to build a cache for only the package or only the dependencies'
complete -c spack -n '__fish_spack_using_command buildcache create' -l with-build-dependencies -f -a with_build_dependencies
complete -c spack -n '__fish_spack_using_command buildcache create' -l with-build-dependencies -d 'include build dependencies in the buildcache'
complete -c spack -n '__fish_spack_using_command buildcache create' -l without-build-dependencies -f -a without_build_dependencies
complete -c spack -n '__fish_spack_using_command buildcache create' -l without-build-dependencies -d 'exclude build dependencies from the buildcache'
complete -c spack -n '__fish_spack_using_command buildcache create' -l fail-fast -f -a fail_fast
complete -c spack -n '__fish_spack_using_command buildcache create' -l fail-fast -d 'stop pushing on first failure (default is best effort)'
complete -c spack -n '__fish_spack_using_command buildcache create' -l base-image -r -f -a base_image