OCI buildcache (#38358)

Credits to @ChristianKniep for advocating the idea of OCI image layers
being identical to spack buildcache tarballs.

With this you can configure an OCI registry as a buildcache:

```console 
$ spack mirror add my_registry oci://user/image # Dockerhub

$ spack mirror add my_registry oci://ghcr.io/haampie/spack-test # GHCR

$ spack mirror set --push --oci-username ... --oci-password ... my_registry  # set login credentials
```

which should result in this config:

```yaml
mirrors:
  my_registry:
    url: oci://ghcr.io/haampie/spack-test
    push:
      access_pair: [<username>, <password>]
```

It can be used like any other registry

```
spack buildcache push my_registry [specs...]
```

It will upload the Spack tarballs in parallel, as well as manifest + config
files s.t. the binaries are compatible with `docker pull` or `skopeo copy`.

In fact, a base image can be added to get a _runnable_ image:

```console
$ spack buildcache push --base-image ubuntu:23.04 my_registry python
Pushed ... as [image]:python-3.11.2-65txfcpqbmpawclvtasuog4yzmxwaoia.spack

$ docker run --rm -it [image]:python-3.11.2-65txfcpqbmpawclvtasuog4yzmxwaoia.spack
```

which should really be a game changer for sharing binaries.

Further, all content-addressable blobs that are downloaded and verified
will be cached in Spack's download cache. This should make repeated
`push` commands faster, as well as `push` followed by a separate
`update-index` command.

An end to end example of how to use this in Github Actions is here:

**https://github.com/haampie/spack-oci-buildcache-example**


TODO:

- [x] Generate environment modifications in config so PATH is set up
- [x] Enrich config with Spack's `spec` json (this is allowed in the OCI specification)
- [x] When ^ is done, add logic to create an index in say `<image>:index` by fetching all config files (using OCI distribution discovery API)
- [x] Add logic to use object storage in an OCI registry in `spack install`.
- [x] Make the user pick the base image for generated OCI images.
- [x] Update buildcache install logic to deal with absolute paths in tarballs
- [x] Merge with `spack buildcache` command
- [x] Merge #37441 (included here)
- [x] Merge #39077 (included here)
- [x] #39187 + #39285
- [x] #39341
- [x] Not a blocker: #35737 fixes correctness run env for the generated container images

NOTE:

1. `oci://` is unfortunately taken, so it's being abused in this PR to mean "oci type mirror". `skopeo` uses `docker://` which I'd like to avoid, given that classical docker v1 registries are not supported.
2. this is currently `https`-only, given that basic auth is used to login. I _could_ be convinced to allow http, but I'd prefer not to, given that for a `spack buildcache push` command multiple domains can be involved (auth server, source of base image, destination registry). Right now, no urllib http handler is added, so redirects to https and auth servers with http urls will simply result in a hard failure.

CAVEATS:

1. Signing is not implemented in this PR. `gpg --clearsign` is not the nicest solution, since (a) the spec.json is merged into the image config, which must be valid json, and (b) it would be better to sign the manifest (referencing both config/spec file and tarball) using more conventional image signing tools
2. `spack.binary_distribution.push` is not yet implemented for the OCI buildcache, only `spack buildcache push` is. This is because I'd like to always push images + deps to the registry, so that it's `docker pull`-able, whereas in `spack ci` we really wanna push an individual package without its deps to say `pr-xyz`, while its deps reside in some `develop` buildcache.
3. The `push -j ...` flag only works for OCI buildcache, not for others
This commit is contained in:
Harmen Stoppels
2023-10-27 15:30:04 +02:00
committed by GitHub
parent 3fff8be929
commit 195f965076
24 changed files with 3473 additions and 236 deletions

View File

@@ -156,6 +156,131 @@ List of popular build caches
* `Extreme-scale Scientific Software Stack (E4S) <https://e4s-project.github.io/>`_: `build cache <https://oaciss.uoregon.edu/e4s/inventory.html>`_
-----------------------------------------
OCI / Docker V2 registries as build cache
-----------------------------------------
Spack can also use OCI or Docker V2 registries such as Dockerhub, Quay.io,
Github Packages, GitLab Container Registry, JFrog Artifactory, and others
as build caches. This is a convenient way to share binaries using public
infrastructure, or to cache Spack built binaries in Github Actions and
GitLab CI.
To get started, configure an OCI mirror using ``oci://`` as the scheme,
and optionally specify a username and password (or personal access token):
.. code-block:: console
$ spack mirror add --oci-username username --oci-password password my_registry oci://example.com/my_image
Spack follows the naming conventions of Docker, with Dockerhub as the default
registry. To use Dockerhub, you can omit the registry domain:
.. code-block:: console
$ spack mirror add --oci-username username --oci-password password my_registry oci://username/my_image
From here, you can use the mirror as any other build cache:
.. code-block:: console
$ spack buildcache push my_registry <specs...> # push to the registry
$ spack install <specs...> # install from the registry
A unique feature of buildcaches on top of OCI registries is that it's incredibly
easy to generate get a runnable container image with the binaries installed. This
is a great way to make applications available to users without requiring them to
install Spack -- all you need is Docker, Podman or any other OCI-compatible container
runtime.
To produce container images, all you need to do is add the ``--base-image`` flag
when pushing to the build cache:
.. code-block:: console
$ spack buildcache push --base-image ubuntu:20.04 my_registry ninja
Pushed to example.com/my_image:ninja-1.11.1-yxferyhmrjkosgta5ei6b4lqf6bxbscz.spack
$ docker run -it example.com/my_image:ninja-1.11.1-yxferyhmrjkosgta5ei6b4lqf6bxbscz.spack
root@e4c2b6f6b3f4:/# ninja --version
1.11.1
If ``--base-image`` is not specified, distroless images are produced. In practice,
you won't be able to run these as containers, since they don't come with libc and
other system dependencies. However, they are still compatible with tools like
``skopeo``, ``podman``, and ``docker`` for pulling and pushing.
.. note::
The docker ``overlayfs2`` storage driver is limited to 128 layers, above which a
``max depth exceeded`` error may be produced when pulling the image. There
are `alternative drivers <https://docs.docker.com/storage/storagedriver/>`_.
------------------------------------
Using a buildcache in GitHub Actions
------------------------------------
GitHub Actions is a popular CI/CD platform for building and testing software,
but each CI job has limited resources, making from source builds too slow for
many applications. Spack build caches can be used to share binaries between CI
runs, speeding up CI significantly.
A typical workflow is to include a ``spack.yaml`` environment in your repository
that specifies the packages to install:
.. code-block:: yaml
spack:
specs: [pkg-x, pkg-y]
packages:
all:
require: target=x86_64_v2
mirrors:
github_packages: oci://ghcr.io/<user>/<repo>
And a GitHub action that sets up Spack, installs packages from the build cache
or from sources, and pushes newly built binaries to the build cache:
.. code-block:: yaml
name: Install Spack packages
on: push
env:
SPACK_COLOR: always
jobs:
example:
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Install Spack
run: |
git clone --depth=1 https://github.com/spack/spack.git
echo "$PWD/spack/bin/" >> "$GITHUB_PATH"
- name: Concretize
run: spack -e . concretize
- name: Install
run: spack -e . install --no-check-signature --fail-fast
- name: Push to buildcache
run: |
spack -e . mirror set --oci-username <user> --oci-password "${{ secrets.GITHUB_TOKEN }}" github_packages
spack -e . buildcache push --base-image ubuntu:22.04 --unsigned --update-index github_packages
if: always()
The first time this action runs, it will build the packages from source and
push them to the build cache. Subsequent runs will pull the binaries from the
build cache. The concretizer will ensure that prebuilt binaries are favored
over source builds.
The build cache entries appear in the GitHub Packages section of your repository,
and contain instructions for pulling and running them with ``docker`` or ``podman``.
----------
Relocation
----------

View File

@@ -204,6 +204,7 @@ def setup(sphinx):
("py:class", "clingo.Control"),
("py:class", "six.moves.urllib.parse.ParseResult"),
("py:class", "TextIO"),
("py:class", "hashlib._Hash"),
# Spack classes that are private and we don't want to expose
("py:class", "spack.provider_index._IndexBase"),
("py:class", "spack.repo._PrependFileLoader"),

View File

@@ -5,11 +5,13 @@
import codecs
import collections
import errno
import hashlib
import io
import itertools
import json
import os
import pathlib
import re
import shutil
import sys
@@ -31,6 +33,7 @@
import llnl.util.tty as tty
from llnl.util.filesystem import BaseDirectoryVisitor, mkdirp, visit_directory_tree
import spack.caches
import spack.cmd
import spack.config as config
import spack.database as spack_db
@@ -38,6 +41,9 @@
import spack.hooks
import spack.hooks.sbang
import spack.mirror
import spack.oci.image
import spack.oci.oci
import spack.oci.opener
import spack.platforms
import spack.relocate as relocate
import spack.repo
@@ -471,14 +477,18 @@ def _fetch_and_cache_index(self, mirror_url, cache_entry={}):
FetchIndexError
"""
# TODO: get rid of this request, handle 404 better
if not web_util.url_exists(
scheme = urllib.parse.urlparse(mirror_url).scheme
if scheme != "oci" and not web_util.url_exists(
url_util.join(mirror_url, _build_cache_relative_path, "index.json")
):
return False
etag = cache_entry.get("etag", None)
if etag:
fetcher = EtagIndexFetcher(mirror_url, etag)
if scheme == "oci":
# TODO: Actually etag and OCI are not mutually exclusive...
fetcher = OCIIndexFetcher(mirror_url, cache_entry.get("index_hash", None))
elif cache_entry.get("etag"):
fetcher = EtagIndexFetcher(mirror_url, cache_entry["etag"])
else:
fetcher = DefaultIndexFetcher(
mirror_url, local_hash=cache_entry.get("index_hash", None)
@@ -622,21 +632,14 @@ def build_cache_prefix(prefix):
def buildinfo_file_name(prefix):
"""
Filename of the binary package meta-data file
"""
return os.path.join(prefix, ".spack/binary_distribution")
"""Filename of the binary package meta-data file"""
return os.path.join(prefix, ".spack", "binary_distribution")
def read_buildinfo_file(prefix):
"""
Read buildinfo file
"""
filename = buildinfo_file_name(prefix)
with open(filename, "r") as inputfile:
content = inputfile.read()
buildinfo = syaml.load(content)
return buildinfo
"""Read buildinfo file"""
with open(buildinfo_file_name(prefix), "r") as f:
return syaml.load(f)
class BuildManifestVisitor(BaseDirectoryVisitor):
@@ -819,18 +822,6 @@ def tarball_path_name(spec, ext):
return os.path.join(tarball_directory_name(spec), tarball_name(spec, ext))
def checksum_tarball(file):
# calculate sha256 hash of tar file
block_size = 65536
hasher = hashlib.sha256()
with open(file, "rb") as tfile:
buf = tfile.read(block_size)
while len(buf) > 0:
hasher.update(buf)
buf = tfile.read(block_size)
return hasher.hexdigest()
def select_signing_key(key=None):
if key is None:
keys = spack.util.gpg.signing_keys()
@@ -1147,14 +1138,17 @@ def gzip_compressed_tarfile(path):
# compresslevel=6 gzip default: llvm takes 4mins, roughly 2.1GB
# compresslevel=9 python default: llvm takes 12mins, roughly 2.1GB
# So we follow gzip.
with open(path, "wb") as fileobj, closing(
GzipFile(filename="", mode="wb", compresslevel=6, mtime=0, fileobj=fileobj)
) as gzip_file, tarfile.TarFile(name="", mode="w", fileobj=gzip_file) as tar:
yield tar
with open(path, "wb") as f, ChecksumWriter(f) as inner_checksum, closing(
GzipFile(filename="", mode="wb", compresslevel=6, mtime=0, fileobj=inner_checksum)
) as gzip_file, ChecksumWriter(gzip_file) as outer_checksum, tarfile.TarFile(
name="", mode="w", fileobj=outer_checksum
) as tar:
yield tar, inner_checksum, outer_checksum
def _tarinfo_name(p: str):
return p.lstrip("/")
def _tarinfo_name(absolute_path: str, *, _path=pathlib.PurePath) -> str:
"""Compute tarfile entry name as the relative path from the (system) root."""
return _path(*_path(absolute_path).parts[1:]).as_posix()
def tarfile_of_spec_prefix(tar: tarfile.TarFile, prefix: str) -> None:
@@ -1234,8 +1228,88 @@ def tarfile_of_spec_prefix(tar: tarfile.TarFile, prefix: str) -> None:
dir_stack.extend(reversed(new_dirs)) # we pop, so reverse to stay alphabetical
class ChecksumWriter(io.BufferedIOBase):
"""Checksum writer computes a checksum while writing to a file."""
myfileobj = None
def __init__(self, fileobj, algorithm=hashlib.sha256):
self.fileobj = fileobj
self.hasher = algorithm()
self.length = 0
def hexdigest(self):
return self.hasher.hexdigest()
def write(self, data):
if isinstance(data, (bytes, bytearray)):
length = len(data)
else:
data = memoryview(data)
length = data.nbytes
if length > 0:
self.fileobj.write(data)
self.hasher.update(data)
self.length += length
return length
def read(self, size=-1):
raise OSError(errno.EBADF, "read() on write-only object")
def read1(self, size=-1):
raise OSError(errno.EBADF, "read1() on write-only object")
def peek(self, n):
raise OSError(errno.EBADF, "peek() on write-only object")
@property
def closed(self):
return self.fileobj is None
def close(self):
fileobj = self.fileobj
if fileobj is None:
return
self.fileobj.close()
self.fileobj = None
def flush(self):
self.fileobj.flush()
def fileno(self):
return self.fileobj.fileno()
def rewind(self):
raise OSError("Can't rewind while computing checksum")
def readable(self):
return False
def writable(self):
return True
def seekable(self):
return True
def tell(self):
return self.fileobj.tell()
def seek(self, offset, whence=io.SEEK_SET):
# In principle forward seek is possible with b"0" padding,
# but this is not implemented.
if offset == 0 and whence == io.SEEK_CUR:
return
raise OSError("Can't seek while computing checksum")
def readline(self, size=-1):
raise OSError(errno.EBADF, "readline() on write-only object")
def _do_create_tarball(tarfile_path: str, binaries_dir: str, buildinfo: dict):
with gzip_compressed_tarfile(tarfile_path) as tar:
with gzip_compressed_tarfile(tarfile_path) as (tar, inner_checksum, outer_checksum):
# Tarball the install prefix
tarfile_of_spec_prefix(tar, binaries_dir)
@@ -1247,6 +1321,8 @@ def _do_create_tarball(tarfile_path: str, binaries_dir: str, buildinfo: dict):
tarinfo.mode = 0o644
tar.addfile(tarinfo, io.BytesIO(bstring))
return inner_checksum.hexdigest(), outer_checksum.hexdigest()
class PushOptions(NamedTuple):
#: Overwrite existing tarball/metadata files in buildcache
@@ -1322,13 +1398,9 @@ def _build_tarball_in_stage_dir(spec: Spec, out_url: str, stage_dir: str, option
# create info for later relocation and create tar
buildinfo = get_buildinfo_dict(spec)
_do_create_tarball(tarfile_path, binaries_dir, buildinfo)
# get the sha256 checksum of the tarball
checksum = checksum_tarball(tarfile_path)
checksum, _ = _do_create_tarball(tarfile_path, binaries_dir, buildinfo)
# add sha256 checksum to spec.json
with open(spec_file, "r") as inputfile:
content = inputfile.read()
if spec_file.endswith(".json"):
@@ -1371,10 +1443,21 @@ def _build_tarball_in_stage_dir(spec: Spec, out_url: str, stage_dir: str, option
return None
class NotInstalledError(spack.error.SpackError):
"""Raised when a spec is not installed but picked to be packaged."""
def __init__(self, specs: List[Spec]):
super().__init__(
"Cannot push non-installed packages",
", ".join(s.cformat("{name}{@version}{/hash:7}") for s in specs),
)
def specs_to_be_packaged(
specs: List[Spec], root: bool = True, dependencies: bool = True
) -> List[Spec]:
"""Return the list of nodes to be packaged, given a list of specs.
Raises NotInstalledError if a spec is not installed but picked to be packaged.
Args:
specs: list of root specs to be processed
@@ -1382,19 +1465,35 @@ def specs_to_be_packaged(
dependencies: include the dependencies of each
spec in the nodes
"""
if not root and not dependencies:
return []
elif dependencies:
nodes = traverse.traverse_nodes(specs, root=root, deptype="all")
else:
nodes = set(specs)
# Limit to installed non-externals.
packageable = lambda n: not n.external and n.installed
# Mass install check
# Filter packageable roots
with spack.store.STORE.db.read_transaction():
return list(filter(packageable, nodes))
if root:
# Error on uninstalled roots, when roots are requested
uninstalled_roots = list(s for s in specs if not s.installed)
if uninstalled_roots:
raise NotInstalledError(uninstalled_roots)
roots = specs
else:
roots = []
if dependencies:
# Error on uninstalled deps, when deps are requested
deps = list(
traverse.traverse_nodes(
specs, deptype="all", order="breadth", root=False, key=traverse.by_dag_hash
)
)
uninstalled_deps = list(s for s in deps if not s.installed)
if uninstalled_deps:
raise NotInstalledError(uninstalled_deps)
else:
deps = []
return [s for s in itertools.chain(roots, deps) if not s.external]
def push(spec: Spec, mirror_url: str, options: PushOptions):
@@ -1502,8 +1601,6 @@ def download_tarball(spec, unsigned=False, mirrors_for_spec=None):
tarball = tarball_path_name(spec, ".spack")
specfile_prefix = tarball_name(spec, ".spec")
mirrors_to_try = []
# Note on try_first and try_next:
# mirrors_for_spec mostly likely came from spack caching remote
# mirror indices locally and adding their specs to a local data
@@ -1516,63 +1613,116 @@ def download_tarball(spec, unsigned=False, mirrors_for_spec=None):
try_first = [i["mirror_url"] for i in mirrors_for_spec] if mirrors_for_spec else []
try_next = [i.fetch_url for i in configured_mirrors if i.fetch_url not in try_first]
for url in try_first + try_next:
mirrors_to_try.append(
{
"specfile": url_util.join(url, _build_cache_relative_path, specfile_prefix),
"spackfile": url_util.join(url, _build_cache_relative_path, tarball),
}
)
mirrors = try_first + try_next
tried_to_verify_sigs = []
# Assumes we care more about finding a spec file by preferred ext
# than by mirrory priority. This can be made less complicated as
# we remove support for deprecated spec formats and buildcache layouts.
for ext in ["json.sig", "json"]:
for mirror_to_try in mirrors_to_try:
specfile_url = "{0}.{1}".format(mirror_to_try["specfile"], ext)
spackfile_url = mirror_to_try["spackfile"]
local_specfile_stage = try_fetch(specfile_url)
if local_specfile_stage:
local_specfile_path = local_specfile_stage.save_filename
signature_verified = False
for try_signed in (True, False):
for mirror in mirrors:
# If it's an OCI index, do things differently, since we cannot compose URLs.
parsed = urllib.parse.urlparse(mirror)
if ext.endswith(".sig") and not unsigned:
# If we found a signed specfile at the root, try to verify
# the signature immediately. We will not download the
# tarball if we could not verify the signature.
tried_to_verify_sigs.append(specfile_url)
signature_verified = try_verify(local_specfile_path)
if not signature_verified:
tty.warn("Failed to verify: {0}".format(specfile_url))
# TODO: refactor this to some "nice" place.
if parsed.scheme == "oci":
ref = spack.oci.image.ImageReference.from_string(mirror[len("oci://") :]).with_tag(
spack.oci.image.default_tag(spec)
)
if unsigned or signature_verified or not ext.endswith(".sig"):
# We will download the tarball in one of three cases:
# 1. user asked for --no-check-signature
# 2. user didn't ask for --no-check-signature, but we
# found a spec.json.sig and verified the signature already
# 3. neither of the first two cases are true, but this file
# is *not* a signed json (not a spec.json.sig file). That
# means we already looked at all the mirrors and either didn't
# find any .sig files or couldn't verify any of them. But it
# is still possible to find an old style binary package where
# the signature is a detached .asc file in the outer archive
# of the tarball, and in that case, the only way to know is to
# download the tarball. This is a deprecated use case, so if
# something goes wrong during the extraction process (can't
# verify signature, checksum doesn't match) we will fail at
# that point instead of trying to download more tarballs from
# the remaining mirrors, looking for one we can use.
tarball_stage = try_fetch(spackfile_url)
if tarball_stage:
return {
"tarball_stage": tarball_stage,
"specfile_stage": local_specfile_stage,
"signature_verified": signature_verified,
}
# Fetch the manifest
try:
response = spack.oci.opener.urlopen(
urllib.request.Request(
url=ref.manifest_url(),
headers={"Accept": "application/vnd.oci.image.manifest.v1+json"},
)
)
except Exception:
continue
local_specfile_stage.destroy()
# Download the config = spec.json and the relevant tarball
try:
manifest = json.loads(response.read())
spec_digest = spack.oci.image.Digest.from_string(manifest["config"]["digest"])
tarball_digest = spack.oci.image.Digest.from_string(
manifest["layers"][-1]["digest"]
)
except Exception:
continue
with spack.oci.oci.make_stage(
ref.blob_url(spec_digest), spec_digest, keep=True
) as local_specfile_stage:
try:
local_specfile_stage.fetch()
local_specfile_stage.check()
except Exception:
continue
local_specfile_stage.cache_local()
with spack.oci.oci.make_stage(
ref.blob_url(tarball_digest), tarball_digest, keep=True
) as tarball_stage:
try:
tarball_stage.fetch()
tarball_stage.check()
except Exception:
continue
tarball_stage.cache_local()
return {
"tarball_stage": tarball_stage,
"specfile_stage": local_specfile_stage,
"signature_verified": False,
}
else:
ext = "json.sig" if try_signed else "json"
specfile_path = url_util.join(mirror, _build_cache_relative_path, specfile_prefix)
specfile_url = f"{specfile_path}.{ext}"
spackfile_url = url_util.join(mirror, _build_cache_relative_path, tarball)
local_specfile_stage = try_fetch(specfile_url)
if local_specfile_stage:
local_specfile_path = local_specfile_stage.save_filename
signature_verified = False
if try_signed and not unsigned:
# If we found a signed specfile at the root, try to verify
# the signature immediately. We will not download the
# tarball if we could not verify the signature.
tried_to_verify_sigs.append(specfile_url)
signature_verified = try_verify(local_specfile_path)
if not signature_verified:
tty.warn("Failed to verify: {0}".format(specfile_url))
if unsigned or signature_verified or not try_signed:
# We will download the tarball in one of three cases:
# 1. user asked for --no-check-signature
# 2. user didn't ask for --no-check-signature, but we
# found a spec.json.sig and verified the signature already
# 3. neither of the first two cases are true, but this file
# is *not* a signed json (not a spec.json.sig file). That
# means we already looked at all the mirrors and either didn't
# find any .sig files or couldn't verify any of them. But it
# is still possible to find an old style binary package where
# the signature is a detached .asc file in the outer archive
# of the tarball, and in that case, the only way to know is to
# download the tarball. This is a deprecated use case, so if
# something goes wrong during the extraction process (can't
# verify signature, checksum doesn't match) we will fail at
# that point instead of trying to download more tarballs from
# the remaining mirrors, looking for one we can use.
tarball_stage = try_fetch(spackfile_url)
if tarball_stage:
return {
"tarball_stage": tarball_stage,
"specfile_stage": local_specfile_stage,
"signature_verified": signature_verified,
}
local_specfile_stage.destroy()
# Falling through the nested loops meeans we exhaustively searched
# for all known kinds of spec files on all mirrors and did not find
@@ -1805,7 +1955,7 @@ def _extract_inner_tarball(spec, filename, extract_to, unsigned, remote_checksum
)
# compute the sha256 checksum of the tarball
local_checksum = checksum_tarball(tarfile_path)
local_checksum = spack.util.crypto.checksum(hashlib.sha256, tarfile_path)
expected = remote_checksum["hash"]
# if the checksums don't match don't install
@@ -1866,6 +2016,7 @@ def extract_tarball(spec, download_result, unsigned=False, force=False, timer=ti
spec_dict = sjson.load(content)
bchecksum = spec_dict["binary_cache_checksum"]
filename = download_result["tarball_stage"].save_filename
signature_verified = download_result["signature_verified"]
tmpdir = None
@@ -1898,7 +2049,7 @@ def extract_tarball(spec, download_result, unsigned=False, force=False, timer=ti
)
# compute the sha256 checksum of the tarball
local_checksum = checksum_tarball(tarfile_path)
local_checksum = spack.util.crypto.checksum(hashlib.sha256, tarfile_path)
expected = bchecksum["hash"]
# if the checksums don't match don't install
@@ -2457,7 +2608,7 @@ def get_remote_hash(self):
return None
return remote_hash.decode("utf-8")
def conditional_fetch(self):
def conditional_fetch(self) -> FetchIndexResult:
# Do an intermediate fetch for the hash
# and a conditional fetch for the contents
@@ -2471,12 +2622,12 @@ def conditional_fetch(self):
try:
response = self.urlopen(urllib.request.Request(url_index, headers=self.headers))
except urllib.error.URLError as e:
raise FetchIndexError("Could not fetch index from {}".format(url_index), e)
raise FetchIndexError("Could not fetch index from {}".format(url_index), e) from e
try:
result = codecs.getreader("utf-8")(response).read()
except ValueError as e:
return FetchCacheError("Remote index {} is invalid".format(url_index), e)
raise FetchIndexError("Remote index {} is invalid".format(url_index), e) from e
computed_hash = compute_hash(result)
@@ -2508,7 +2659,7 @@ def __init__(self, url, etag, urlopen=web_util.urlopen):
self.etag = etag
self.urlopen = urlopen
def conditional_fetch(self):
def conditional_fetch(self) -> FetchIndexResult:
# Just do a conditional fetch immediately
url = url_util.join(self.url, _build_cache_relative_path, "index.json")
headers = {
@@ -2539,3 +2690,59 @@ def conditional_fetch(self):
data=result,
fresh=False,
)
class OCIIndexFetcher:
def __init__(self, url: str, local_hash, urlopen=None) -> None:
self.local_hash = local_hash
# Remove oci:// prefix
assert url.startswith("oci://")
self.ref = spack.oci.image.ImageReference.from_string(url[6:])
self.urlopen = urlopen or spack.oci.opener.urlopen
def conditional_fetch(self) -> FetchIndexResult:
"""Download an index from an OCI registry type mirror."""
url_manifest = self.ref.with_tag(spack.oci.image.default_index_tag).manifest_url()
try:
response = self.urlopen(
urllib.request.Request(
url=url_manifest,
headers={"Accept": "application/vnd.oci.image.manifest.v1+json"},
)
)
except urllib.error.URLError as e:
raise FetchIndexError(
"Could not fetch manifest from {}".format(url_manifest), e
) from e
try:
manifest = json.loads(response.read())
except Exception as e:
raise FetchIndexError("Remote index {} is invalid".format(url_manifest), e) from e
# Get first blob hash, which should be the index.json
try:
index_digest = spack.oci.image.Digest.from_string(manifest["layers"][0]["digest"])
except Exception as e:
raise FetchIndexError("Remote index {} is invalid".format(url_manifest), e) from e
# Fresh?
if index_digest.digest == self.local_hash:
return FetchIndexResult(etag=None, hash=None, data=None, fresh=True)
# Otherwise fetch the blob / index.json
response = self.urlopen(
urllib.request.Request(
url=self.ref.blob_url(index_digest),
headers={"Accept": "application/vnd.oci.image.layer.v1.tar+gzip"},
)
)
result = codecs.getreader("utf-8")(response).read()
# Make sure the blob we download has the advertised hash
if compute_hash(result) != index_digest.digest:
raise FetchIndexError(f"Remote index {url_manifest} is invalid")
return FetchIndexResult(etag=None, hash=index_digest.digest, data=result, fresh=False)

View File

@@ -3,16 +3,19 @@
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import argparse
import copy
import glob
import hashlib
import json
import multiprocessing.pool
import os
import shutil
import sys
import tempfile
from typing import List
import urllib.request
from typing import Dict, List, Optional, Tuple
import llnl.util.tty as tty
import llnl.util.tty.color as clr
from llnl.string import plural
from llnl.util.lang import elide_list
@@ -22,17 +25,37 @@
import spack.config
import spack.environment as ev
import spack.error
import spack.hash_types as ht
import spack.mirror
import spack.oci.oci
import spack.oci.opener
import spack.relocate
import spack.repo
import spack.spec
import spack.stage
import spack.store
import spack.user_environment
import spack.util.crypto
import spack.util.url as url_util
import spack.util.web as web_util
from spack.build_environment import determine_number_of_jobs
from spack.cmd import display_specs
from spack.oci.image import (
Digest,
ImageReference,
default_config,
default_index_tag,
default_manifest,
default_tag,
tag_is_spec,
)
from spack.oci.oci import (
copy_missing_layers_with_retry,
get_manifest_and_config_with_retry,
upload_blob_with_retry,
upload_manifest_with_retry,
)
from spack.spec import Spec, save_dependency_specfiles
from spack.stage import Stage
description = "create, download and install binary packages"
section = "packaging"
@@ -58,7 +81,9 @@ def setup_parser(subparser: argparse.ArgumentParser):
push_sign.add_argument(
"--key", "-k", metavar="key", type=str, default=None, help="key for signing"
)
push.add_argument("mirror", type=str, help="mirror name, path, or URL")
push.add_argument(
"mirror", type=arguments.mirror_name_or_url, help="mirror name, path, or URL"
)
push.add_argument(
"--update-index",
"--rebuild-index",
@@ -84,7 +109,10 @@ def setup_parser(subparser: argparse.ArgumentParser):
action="store_true",
help="stop pushing on first failure (default is best effort)",
)
arguments.add_common_arguments(push, ["specs"])
push.add_argument(
"--base-image", default=None, help="specify the base image for the buildcache. "
)
arguments.add_common_arguments(push, ["specs", "jobs"])
push.set_defaults(func=push_fn)
install = subparsers.add_parser("install", help=install_fn.__doc__)
@@ -268,7 +296,22 @@ def _matching_specs(specs: List[Spec]) -> List[Spec]:
return [spack.cmd.disambiguate_spec(s, ev.active_environment(), installed=any) for s in specs]
def push_fn(args: argparse.Namespace):
def _format_spec(spec: Spec) -> str:
return spec.cformat("{name}{@version}{/hash:7}")
def _progress(i: int, total: int):
if total > 1:
digits = len(str(total))
return f"[{i+1:{digits}}/{total}] "
return ""
def _make_pool():
return multiprocessing.pool.Pool(determine_number_of_jobs(parallel=True))
def push_fn(args):
"""create a binary package and push it to a mirror"""
if args.spec_file:
tty.warn(
@@ -281,63 +324,80 @@ def push_fn(args: argparse.Namespace):
else:
specs = spack.cmd.require_active_env("buildcache push").all_specs()
mirror = arguments.mirror_name_or_url(args.mirror)
if args.allow_root:
tty.warn(
"The flag `--allow-root` is the default in Spack 0.21, will be removed in Spack 0.22"
)
url = mirror.push_url
# Check if this is an OCI image.
try:
image_ref = spack.oci.oci.image_from_mirror(args.mirror)
except ValueError:
image_ref = None
# For OCI images, we require dependencies to be pushed for now.
if image_ref:
if "dependencies" not in args.things_to_install:
tty.die("Dependencies must be pushed for OCI images.")
if not args.unsigned:
tty.warn(
"Code signing is currently not supported for OCI images. "
"Use --unsigned to silence this warning."
)
# This is a list of installed, non-external specs.
specs = bindist.specs_to_be_packaged(
specs,
root="package" in args.things_to_install,
dependencies="dependencies" in args.things_to_install,
)
url = args.mirror.push_url
# When pushing multiple specs, print the url once ahead of time, as well as how
# many specs are being pushed.
if len(specs) > 1:
tty.info(f"Selected {len(specs)} specs to push to {url}")
skipped = []
failed = []
# tty printing
color = clr.get_color_when()
format_spec = lambda s: s.format("{name}{@version}{/hash:7}", color=color)
total_specs = len(specs)
digits = len(str(total_specs))
# TODO: unify this logic in the future.
if image_ref:
with tempfile.TemporaryDirectory(
dir=spack.stage.get_stage_root()
) as tmpdir, _make_pool() as pool:
skipped = _push_oci(args, image_ref, specs, tmpdir, pool)
else:
skipped = []
for i, spec in enumerate(specs):
try:
bindist.push_or_raise(
spec,
url,
bindist.PushOptions(
force=args.force,
unsigned=args.unsigned,
key=args.key,
regenerate_index=args.update_index,
),
)
for i, spec in enumerate(specs):
try:
bindist.push_or_raise(
spec,
url,
bindist.PushOptions(
force=args.force,
unsigned=args.unsigned,
key=args.key,
regenerate_index=args.update_index,
),
)
if total_specs > 1:
msg = f"[{i+1:{digits}}/{total_specs}] Pushed {format_spec(spec)}"
else:
msg = f"Pushed {format_spec(spec)} to {url}"
msg = f"{_progress(i, len(specs))}Pushed {_format_spec(spec)}"
if len(specs) == 1:
msg += f" to {url}"
tty.info(msg)
tty.info(msg)
except bindist.NoOverwriteException:
skipped.append(_format_spec(spec))
except bindist.NoOverwriteException:
skipped.append(format_spec(spec))
# Catch any other exception unless the fail fast option is set
except Exception as e:
if args.fail_fast or isinstance(e, (bindist.PickKeyException, bindist.NoKeyException)):
raise
failed.append((format_spec(spec), e))
# Catch any other exception unless the fail fast option is set
except Exception as e:
if args.fail_fast or isinstance(
e, (bindist.PickKeyException, bindist.NoKeyException)
):
raise
failed.append((_format_spec(spec), e))
if skipped:
if len(specs) == 1:
@@ -364,6 +424,341 @@ def push_fn(args: argparse.Namespace):
),
)
# Update the index if requested
# TODO: remove update index logic out of bindist; should be once after all specs are pushed
# not once per spec.
if image_ref and len(skipped) < len(specs) and args.update_index:
with tempfile.TemporaryDirectory(
dir=spack.stage.get_stage_root()
) as tmpdir, _make_pool() as pool:
_update_index_oci(image_ref, tmpdir, pool)
def _get_spack_binary_blob(image_ref: ImageReference) -> Optional[spack.oci.oci.Blob]:
"""Get the spack tarball layer digests and size if it exists"""
try:
manifest, config = get_manifest_and_config_with_retry(image_ref)
return spack.oci.oci.Blob(
compressed_digest=Digest.from_string(manifest["layers"][-1]["digest"]),
uncompressed_digest=Digest.from_string(config["rootfs"]["diff_ids"][-1]),
size=manifest["layers"][-1]["size"],
)
except Exception:
return None
def _push_single_spack_binary_blob(image_ref: ImageReference, spec: spack.spec.Spec, tmpdir: str):
filename = os.path.join(tmpdir, f"{spec.dag_hash()}.tar.gz")
# Create an oci.image.layer aka tarball of the package
compressed_tarfile_checksum, tarfile_checksum = spack.oci.oci.create_tarball(spec, filename)
blob = spack.oci.oci.Blob(
Digest.from_sha256(compressed_tarfile_checksum),
Digest.from_sha256(tarfile_checksum),
os.path.getsize(filename),
)
# Upload the blob
upload_blob_with_retry(image_ref, file=filename, digest=blob.compressed_digest)
# delete the file
os.unlink(filename)
return blob
def _retrieve_env_dict_from_config(config: dict) -> dict:
"""Retrieve the environment variables from the image config file.
Sets a default value for PATH if it is not present.
Args:
config (dict): The image config file.
Returns:
dict: The environment variables.
"""
env = {"PATH": "/bin:/usr/bin"}
if "Env" in config.get("config", {}):
for entry in config["config"]["Env"]:
key, value = entry.split("=", 1)
env[key] = value
return env
def _archspec_to_gooarch(spec: spack.spec.Spec) -> str:
name = spec.target.family.name
name_map = {"aarch64": "arm64", "x86_64": "amd64"}
return name_map.get(name, name)
def _put_manifest(
base_images: Dict[str, Tuple[dict, dict]],
checksums: Dict[str, spack.oci.oci.Blob],
spec: spack.spec.Spec,
image_ref: ImageReference,
tmpdir: str,
):
architecture = _archspec_to_gooarch(spec)
dependencies = list(
reversed(
list(
s
for s in spec.traverse(order="topo", deptype=("link", "run"), root=True)
if not s.external
)
)
)
base_manifest, base_config = base_images[architecture]
env = _retrieve_env_dict_from_config(base_config)
spack.user_environment.environment_modifications_for_specs(spec).apply_modifications(env)
# Create an oci.image.config file
config = copy.deepcopy(base_config)
# Add the diff ids of the dependencies
for s in dependencies:
config["rootfs"]["diff_ids"].append(str(checksums[s.dag_hash()].uncompressed_digest))
# Set the environment variables
config["config"]["Env"] = [f"{k}={v}" for k, v in env.items()]
# From the OCI v1.0 spec:
# > Any extra fields in the Image JSON struct are considered implementation
# > specific and MUST be ignored by any implementations which are unable to
# > interpret them.
# We use this to store the Spack spec, so we can use it to create an index.
spec_dict = spec.to_dict(hash=ht.dag_hash)
spec_dict["buildcache_layout_version"] = 1
spec_dict["binary_cache_checksum"] = {
"hash_algorithm": "sha256",
"hash": checksums[spec.dag_hash()].compressed_digest.digest,
}
config.update(spec_dict)
config_file = os.path.join(tmpdir, f"{spec.dag_hash()}.config.json")
with open(config_file, "w") as f:
json.dump(config, f, separators=(",", ":"))
config_file_checksum = Digest.from_sha256(
spack.util.crypto.checksum(hashlib.sha256, config_file)
)
# Upload the config file
upload_blob_with_retry(image_ref, file=config_file, digest=config_file_checksum)
oci_manifest = {
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"schemaVersion": 2,
"config": {
"mediaType": base_manifest["config"]["mediaType"],
"digest": str(config_file_checksum),
"size": os.path.getsize(config_file),
},
"layers": [
*(layer for layer in base_manifest["layers"]),
*(
{
"mediaType": "application/vnd.oci.image.layer.v1.tar+gzip",
"digest": str(checksums[s.dag_hash()].compressed_digest),
"size": checksums[s.dag_hash()].size,
}
for s in dependencies
),
],
"annotations": {"org.opencontainers.image.description": spec.format()},
}
image_ref_for_spec = image_ref.with_tag(default_tag(spec))
# Finally upload the manifest
upload_manifest_with_retry(image_ref_for_spec, oci_manifest=oci_manifest)
# delete the config file
os.unlink(config_file)
return image_ref_for_spec
def _push_oci(
args,
image_ref: ImageReference,
installed_specs_with_deps: List[Spec],
tmpdir: str,
pool: multiprocessing.pool.Pool,
) -> List[str]:
"""Push specs to an OCI registry
Args:
args: The command line arguments.
image_ref: The image reference.
installed_specs_with_deps: The installed specs to push, excluding externals,
including deps, ordered from roots to leaves.
Returns:
List[str]: The list of skipped specs (already in the buildcache).
"""
# Reverse the order
installed_specs_with_deps = list(reversed(installed_specs_with_deps))
# The base image to use for the package. When not set, we use
# the OCI registry only for storage, and do not use any base image.
base_image_ref: Optional[ImageReference] = (
ImageReference.from_string(args.base_image) if args.base_image else None
)
# Spec dag hash -> blob
checksums: Dict[str, spack.oci.oci.Blob] = {}
# arch -> (manifest, config)
base_images: Dict[str, Tuple[dict, dict]] = {}
# Specs not uploaded because they already exist
skipped = []
if not args.force:
tty.info("Checking for existing specs in the buildcache")
to_be_uploaded = []
tags_to_check = (image_ref.with_tag(default_tag(s)) for s in installed_specs_with_deps)
available_blobs = pool.map(_get_spack_binary_blob, tags_to_check)
for spec, maybe_blob in zip(installed_specs_with_deps, available_blobs):
if maybe_blob is not None:
checksums[spec.dag_hash()] = maybe_blob
skipped.append(_format_spec(spec))
else:
to_be_uploaded.append(spec)
else:
to_be_uploaded = installed_specs_with_deps
if not to_be_uploaded:
return skipped
tty.info(
f"{len(to_be_uploaded)} specs need to be pushed to {image_ref.domain}/{image_ref.name}"
)
# Upload blobs
new_blobs = pool.starmap(
_push_single_spack_binary_blob, ((image_ref, spec, tmpdir) for spec in to_be_uploaded)
)
# And update the spec to blob mapping
for spec, blob in zip(to_be_uploaded, new_blobs):
checksums[spec.dag_hash()] = blob
# Copy base image layers, probably fine to do sequentially.
for spec in to_be_uploaded:
architecture = _archspec_to_gooarch(spec)
# Get base image details, if we don't have them yet
if architecture in base_images:
continue
if base_image_ref is None:
base_images[architecture] = (default_manifest(), default_config(architecture, "linux"))
else:
base_images[architecture] = copy_missing_layers_with_retry(
base_image_ref, image_ref, architecture
)
# Upload manifests
tty.info("Uploading manifests")
pushed_image_ref = pool.starmap(
_put_manifest,
((base_images, checksums, spec, image_ref, tmpdir) for spec in to_be_uploaded),
)
# Print the image names of the top-level specs
for spec, ref in zip(to_be_uploaded, pushed_image_ref):
tty.info(f"Pushed {_format_spec(spec)} to {ref}")
return skipped
def _config_from_tag(image_ref: ImageReference, tag: str) -> Optional[dict]:
# Don't allow recursion here, since Spack itself always uploads
# vnd.oci.image.manifest.v1+json, not vnd.oci.image.index.v1+json
_, config = get_manifest_and_config_with_retry(image_ref.with_tag(tag), tag, recurse=0)
# Do very basic validation: if "spec" is a key in the config, it
# must be a Spec object too.
return config if "spec" in config else None
def _update_index_oci(
image_ref: ImageReference, tmpdir: str, pool: multiprocessing.pool.Pool
) -> None:
response = spack.oci.opener.urlopen(urllib.request.Request(url=image_ref.tags_url()))
spack.oci.opener.ensure_status(response, 200)
tags = json.load(response)["tags"]
# Fetch all image config files in parallel
spec_dicts = pool.starmap(
_config_from_tag, ((image_ref, tag) for tag in tags if tag_is_spec(tag))
)
# Populate the database
db_root_dir = os.path.join(tmpdir, "db_root")
db = bindist.BuildCacheDatabase(db_root_dir)
for spec_dict in spec_dicts:
spec = Spec.from_dict(spec_dict)
db.add(spec, directory_layout=None)
db.mark(spec, "in_buildcache", True)
# Create the index.json file
index_json_path = os.path.join(tmpdir, "index.json")
with open(index_json_path, "w") as f:
db._write_to_file(f)
# Create an empty config.json file
empty_config_json_path = os.path.join(tmpdir, "config.json")
with open(empty_config_json_path, "wb") as f:
f.write(b"{}")
# Upload the index.json file
index_shasum = Digest.from_sha256(spack.util.crypto.checksum(hashlib.sha256, index_json_path))
upload_blob_with_retry(image_ref, file=index_json_path, digest=index_shasum)
# Upload the config.json file
empty_config_digest = Digest.from_sha256(
spack.util.crypto.checksum(hashlib.sha256, empty_config_json_path)
)
upload_blob_with_retry(image_ref, file=empty_config_json_path, digest=empty_config_digest)
# Push a manifest file that references the index.json file as a layer
# Notice that we push this as if it is an image, which it of course is not.
# When the ORAS spec becomes official, we can use that instead of a fake image.
# For now we just use the OCI image spec, so that we don't run into issues with
# automatic garbage collection of blobs that are not referenced by any image manifest.
oci_manifest = {
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"schemaVersion": 2,
# Config is just an empty {} file for now, and irrelevant
"config": {
"mediaType": "application/vnd.oci.image.config.v1+json",
"digest": str(empty_config_digest),
"size": os.path.getsize(empty_config_json_path),
},
# The buildcache index is the only layer, and is not a tarball, we lie here.
"layers": [
{
"mediaType": "application/vnd.oci.image.layer.v1.tar+gzip",
"digest": str(index_shasum),
"size": os.path.getsize(index_json_path),
}
],
}
upload_manifest_with_retry(image_ref.with_tag(default_index_tag), oci_manifest)
def install_fn(args):
"""install from a binary package"""
@@ -522,7 +917,7 @@ def copy_buildcache_file(src_url, dest_url, local_path=None):
local_path = os.path.join(tmpdir, os.path.basename(src_url))
try:
temp_stage = Stage(src_url, path=os.path.dirname(local_path))
temp_stage = spack.stage.Stage(src_url, path=os.path.dirname(local_path))
try:
temp_stage.create()
temp_stage.fetch()
@@ -616,6 +1011,20 @@ def manifest_copy(manifest_file_list):
def update_index(mirror: spack.mirror.Mirror, update_keys=False):
# Special case OCI images for now.
try:
image_ref = spack.oci.oci.image_from_mirror(mirror)
except ValueError:
image_ref = None
if image_ref:
with tempfile.TemporaryDirectory(
dir=spack.stage.get_stage_root()
) as tmpdir, _make_pool() as pool:
_update_index_oci(image_ref, tmpdir, pool)
return
# Otherwise, assume a normal mirror.
url = mirror.push_url
bindist.generate_package_index(url_util.join(url, bindist.build_cache_relative_path()))

View File

@@ -543,7 +543,7 @@ def add_concretizer_args(subparser):
)
def add_s3_connection_args(subparser, add_help):
def add_connection_args(subparser, add_help):
subparser.add_argument(
"--s3-access-key-id", help="ID string to use to connect to this S3 mirror"
)
@@ -559,6 +559,8 @@ def add_s3_connection_args(subparser, add_help):
subparser.add_argument(
"--s3-endpoint-url", help="endpoint URL to use to connect to this S3 mirror"
)
subparser.add_argument("--oci-username", help="username to use to connect to this OCI mirror")
subparser.add_argument("--oci-password", help="password to use to connect to this OCI mirror")
def use_buildcache(cli_arg_value):

View File

@@ -111,7 +111,7 @@ def setup_parser(subparser):
"and source use `--type binary --type source` (default)"
),
)
arguments.add_s3_connection_args(add_parser, False)
arguments.add_connection_args(add_parser, False)
# Remove
remove_parser = sp.add_parser("remove", aliases=["rm"], help=mirror_remove.__doc__)
remove_parser.add_argument("name", help="mnemonic name for mirror", metavar="mirror")
@@ -141,7 +141,7 @@ def setup_parser(subparser):
default=spack.config.default_modify_scope(),
help="configuration scope to modify",
)
arguments.add_s3_connection_args(set_url_parser, False)
arguments.add_connection_args(set_url_parser, False)
# Set
set_parser = sp.add_parser("set", help=mirror_set.__doc__)
@@ -170,7 +170,7 @@ def setup_parser(subparser):
default=spack.config.default_modify_scope(),
help="configuration scope to modify",
)
arguments.add_s3_connection_args(set_parser, False)
arguments.add_connection_args(set_parser, False)
# List
list_parser = sp.add_parser("list", help=mirror_list.__doc__)
@@ -192,6 +192,8 @@ def mirror_add(args):
or args.s3_profile
or args.s3_endpoint_url
or args.type
or args.oci_username
or args.oci_password
):
connection = {"url": args.url}
if args.s3_access_key_id and args.s3_access_key_secret:
@@ -202,6 +204,8 @@ def mirror_add(args):
connection["profile"] = args.s3_profile
if args.s3_endpoint_url:
connection["endpoint_url"] = args.s3_endpoint_url
if args.oci_username and args.oci_password:
connection["access_pair"] = [args.oci_username, args.oci_password]
if args.type:
connection["binary"] = "binary" in args.type
connection["source"] = "source" in args.type
@@ -235,6 +239,8 @@ def _configure_mirror(args):
changes["profile"] = args.s3_profile
if args.s3_endpoint_url:
changes["endpoint_url"] = args.s3_endpoint_url
if args.oci_username and args.oci_password:
changes["access_pair"] = [args.oci_username, args.oci_password]
# argparse cannot distinguish between --binary and --no-binary when same dest :(
# notice that set-url does not have these args, so getattr

View File

@@ -28,6 +28,7 @@
import os.path
import re
import shutil
import urllib.error
import urllib.parse
from typing import List, Optional
@@ -41,6 +42,7 @@
import spack.config
import spack.error
import spack.oci.opener
import spack.url
import spack.util.crypto as crypto
import spack.util.git
@@ -537,6 +539,34 @@ def fetch(self):
tty.msg("Using cached archive: {0}".format(path))
class OCIRegistryFetchStrategy(URLFetchStrategy):
def __init__(self, url=None, checksum=None, **kwargs):
super().__init__(url, checksum, **kwargs)
self._urlopen = kwargs.get("_urlopen", spack.oci.opener.urlopen)
@_needs_stage
def fetch(self):
file = self.stage.save_filename
tty.msg(f"Fetching {self.url}")
try:
response = self._urlopen(self.url)
except urllib.error.URLError as e:
# clean up archive on failure.
if self.archive_file:
os.remove(self.archive_file)
if os.path.lexists(file):
os.remove(file)
raise FailedDownloadError(self.url, f"Failed to fetch {self.url}: {e}") from e
if os.path.lexists(file):
os.remove(file)
with open(file, "wb") as f:
shutil.copyfileobj(response, f)
class VCSFetchStrategy(FetchStrategy):
"""Superclass for version control system fetch strategies.

View File

@@ -18,7 +18,7 @@
import sys
import traceback
import urllib.parse
from typing import Optional, Union
from typing import List, Optional, Union
import llnl.url
import llnl.util.tty as tty
@@ -27,18 +27,18 @@
import spack.caches
import spack.config
import spack.error
import spack.fetch_strategy as fs
import spack.fetch_strategy
import spack.mirror
import spack.oci.image
import spack.spec
import spack.util.path
import spack.util.spack_json as sjson
import spack.util.spack_yaml as syaml
import spack.util.url as url_util
from spack.util.spack_yaml import syaml_dict
from spack.version import VersionList
import spack.version
#: What schemes do we support
supported_url_schemes = ("file", "http", "https", "sftp", "ftp", "s3", "gs")
supported_url_schemes = ("file", "http", "https", "sftp", "ftp", "s3", "gs", "oci")
def _url_or_path_to_url(url_or_path: str) -> str:
@@ -230,12 +230,12 @@ def _get_value(self, attribute: str, direction: str):
value = self._data.get(direction, {})
# Return top-level entry if only a URL was set.
if isinstance(value, str):
return self._data.get(attribute, None)
if isinstance(value, str) or attribute not in value:
return self._data.get(attribute)
return self._data.get(direction, {}).get(attribute, None)
return value[attribute]
def get_url(self, direction: str):
def get_url(self, direction: str) -> str:
if direction not in ("fetch", "push"):
raise ValueError(f"direction must be either 'fetch' or 'push', not {direction}")
@@ -255,18 +255,21 @@ def get_url(self, direction: str):
elif "url" in info:
url = info["url"]
return _url_or_path_to_url(url) if url else None
if not url:
raise ValueError(f"Mirror {self.name} has no URL configured")
def get_access_token(self, direction: str):
return _url_or_path_to_url(url)
def get_access_token(self, direction: str) -> Optional[str]:
return self._get_value("access_token", direction)
def get_access_pair(self, direction: str):
def get_access_pair(self, direction: str) -> Optional[List]:
return self._get_value("access_pair", direction)
def get_profile(self, direction: str):
def get_profile(self, direction: str) -> Optional[str]:
return self._get_value("profile", direction)
def get_endpoint_url(self, direction: str):
def get_endpoint_url(self, direction: str) -> Optional[str]:
return self._get_value("endpoint_url", direction)
@@ -330,7 +333,7 @@ def from_json(stream, name=None):
raise sjson.SpackJSONError("error parsing JSON mirror collection:", str(e)) from e
def to_dict(self, recursive=False):
return syaml_dict(
return syaml.syaml_dict(
sorted(
((k, (v.to_dict() if recursive else v)) for (k, v) in self._mirrors.items()),
key=operator.itemgetter(0),
@@ -372,7 +375,7 @@ def __len__(self):
def _determine_extension(fetcher):
if isinstance(fetcher, fs.URLFetchStrategy):
if isinstance(fetcher, spack.fetch_strategy.URLFetchStrategy):
if fetcher.expand_archive:
# If we fetch with a URLFetchStrategy, use URL's archive type
ext = llnl.url.determine_url_file_extension(fetcher.url)
@@ -437,6 +440,19 @@ def __iter__(self):
yield self.cosmetic_path
class OCIImageLayout:
"""Follow the OCI Image Layout Specification to archive blobs
Paths are of the form `blobs/<algorithm>/<digest>`
"""
def __init__(self, digest: spack.oci.image.Digest) -> None:
self.storage_path = os.path.join("blobs", digest.algorithm, digest.digest)
def __iter__(self):
yield self.storage_path
def mirror_archive_paths(fetcher, per_package_ref, spec=None):
"""Returns a ``MirrorReference`` object which keeps track of the relative
storage path of the resource associated with the specified ``fetcher``."""
@@ -482,7 +498,7 @@ def get_all_versions(specs):
for version in pkg_cls.versions:
version_spec = spack.spec.Spec(pkg_cls.name)
version_spec.versions = VersionList([version])
version_spec.versions = spack.version.VersionList([version])
version_specs.append(version_spec)
return version_specs
@@ -521,7 +537,7 @@ def get_matching_versions(specs, num_versions=1):
# Generate only versions that satisfy the spec.
if spec.concrete or v.intersects(spec.versions):
s = spack.spec.Spec(pkg.name)
s.versions = VersionList([v])
s.versions = spack.version.VersionList([v])
s.variants = spec.variants.copy()
# This is needed to avoid hanging references during the
# concretization phase
@@ -591,14 +607,14 @@ def add(mirror: Mirror, scope=None):
"""Add a named mirror in the given scope"""
mirrors = spack.config.get("mirrors", scope=scope)
if not mirrors:
mirrors = syaml_dict()
mirrors = syaml.syaml_dict()
if mirror.name in mirrors:
tty.die("Mirror with name {} already exists.".format(mirror.name))
items = [(n, u) for n, u in mirrors.items()]
items.insert(0, (mirror.name, mirror.to_dict()))
mirrors = syaml_dict(items)
mirrors = syaml.syaml_dict(items)
spack.config.set("mirrors", mirrors, scope=scope)
@@ -606,7 +622,7 @@ def remove(name, scope):
"""Remove the named mirror in the given scope"""
mirrors = spack.config.get("mirrors", scope=scope)
if not mirrors:
mirrors = syaml_dict()
mirrors = syaml.syaml_dict()
if name not in mirrors:
tty.die("No mirror with name %s" % name)

View File

@@ -0,0 +1,4 @@
# Copyright 2013-2023 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)

View File

@@ -0,0 +1,228 @@
# Copyright 2013-2023 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import re
import urllib.parse
from typing import Optional, Union
import spack.spec
# all the building blocks
alphanumeric = r"[a-z0-9]+"
separator = r"(?:[._]|__|[-]+)"
localhost = r"localhost"
domainNameComponent = r"(?:[a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9-]*[a-zA-Z0-9])"
optionalPort = r"(?::[0-9]+)?"
tag = r"[\w][\w.-]{0,127}"
digestPat = r"[A-Za-z][A-Za-z0-9]*(?:[-_+.][A-Za-z][A-Za-z0-9]*)*[:][0-9a-fA-F]{32,}"
ipv6address = r"\[(?:[a-fA-F0-9:]+)\]"
# domain name
domainName = rf"{domainNameComponent}(?:\.{domainNameComponent})*"
host = rf"(?:{domainName}|{ipv6address})"
domainAndPort = rf"{host}{optionalPort}"
# image name
pathComponent = rf"{alphanumeric}(?:{separator}{alphanumeric})*"
remoteName = rf"{pathComponent}(?:\/{pathComponent})*"
namePat = rf"(?:{domainAndPort}\/)?{remoteName}"
# Regex for a full image reference, with 3 groups: name, tag, digest
referencePat = re.compile(rf"^({namePat})(?::({tag}))?(?:@({digestPat}))?$")
# Regex for splitting the name into domain and path components
anchoredNameRegexp = re.compile(rf"^(?:({domainAndPort})\/)?({remoteName})$")
def ensure_sha256_checksum(oci_blob: str):
"""Validate that the reference is of the format sha256:<checksum>
Return the checksum if valid, raise ValueError otherwise."""
if ":" not in oci_blob:
raise ValueError(f"Invalid OCI blob format: {oci_blob}")
alg, checksum = oci_blob.split(":", 1)
if alg != "sha256":
raise ValueError(f"Unsupported OCI blob checksum algorithm: {alg}")
if len(checksum) != 64:
raise ValueError(f"Invalid OCI blob checksum length: {len(checksum)}")
return checksum
class Digest:
"""Represents a digest in the format <algorithm>:<digest>.
Currently only supports sha256 digests."""
__slots__ = ["algorithm", "digest"]
def __init__(self, *, algorithm: str, digest: str) -> None:
self.algorithm = algorithm
self.digest = digest
def __eq__(self, __value: object) -> bool:
if not isinstance(__value, Digest):
return NotImplemented
return self.algorithm == __value.algorithm and self.digest == __value.digest
@classmethod
def from_string(cls, string: str) -> "Digest":
return cls(algorithm="sha256", digest=ensure_sha256_checksum(string))
@classmethod
def from_sha256(cls, digest: str) -> "Digest":
return cls(algorithm="sha256", digest=digest)
def __str__(self) -> str:
return f"{self.algorithm}:{self.digest}"
class ImageReference:
"""A parsed image of the form domain/name:tag[@digest].
The digest is optional, and domain and tag are automatically
filled out with defaults when parsed from string."""
__slots__ = ["domain", "name", "tag", "digest"]
def __init__(
self, *, domain: str, name: str, tag: str = "latest", digest: Optional[Digest] = None
):
self.domain = domain
self.name = name
self.tag = tag
self.digest = digest
@classmethod
def from_string(cls, string) -> "ImageReference":
match = referencePat.match(string)
if not match:
raise ValueError(f"Invalid image reference: {string}")
image, tag, digest = match.groups()
assert isinstance(image, str)
assert isinstance(tag, (str, type(None)))
assert isinstance(digest, (str, type(None)))
match = anchoredNameRegexp.match(image)
# This can never happen, since the regex is implied
# by the regex above. It's just here to make mypy happy.
assert match, f"Invalid image reference: {string}"
domain, name = match.groups()
assert isinstance(domain, (str, type(None)))
assert isinstance(name, str)
# Fill out defaults like docker would do...
# Based on github.com/distribution/distribution: allow short names like "ubuntu"
# and "user/repo" to be interpreted as "library/ubuntu" and "user/repo:latest
# Not sure if Spack should follow Docker, but it's what people expect...
if not domain:
domain = "index.docker.io"
name = f"library/{name}"
elif (
"." not in domain
and ":" not in domain
and domain != "localhost"
and domain == domain.lower()
):
name = f"{domain}/{name}"
domain = "index.docker.io"
if not tag:
tag = "latest"
# sha256 is currently the only algorithm that
# we implement, even though the spec allows for more
if isinstance(digest, str):
digest = Digest.from_string(digest)
return cls(domain=domain, name=name, tag=tag, digest=digest)
def manifest_url(self) -> str:
digest_or_tag = self.digest or self.tag
return f"https://{self.domain}/v2/{self.name}/manifests/{digest_or_tag}"
def blob_url(self, digest: Union[str, Digest]) -> str:
if isinstance(digest, str):
digest = Digest.from_string(digest)
return f"https://{self.domain}/v2/{self.name}/blobs/{digest}"
def with_digest(self, digest: Union[str, Digest]) -> "ImageReference":
if isinstance(digest, str):
digest = Digest.from_string(digest)
return ImageReference(domain=self.domain, name=self.name, tag=self.tag, digest=digest)
def with_tag(self, tag: str) -> "ImageReference":
return ImageReference(domain=self.domain, name=self.name, tag=tag, digest=self.digest)
def uploads_url(self, digest: Optional[Digest] = None) -> str:
url = f"https://{self.domain}/v2/{self.name}/blobs/uploads/"
if digest:
url += f"?digest={digest}"
return url
def tags_url(self) -> str:
return f"https://{self.domain}/v2/{self.name}/tags/list"
def endpoint(self, path: str = "") -> str:
return urllib.parse.urljoin(f"https://{self.domain}/v2/", path)
def __str__(self) -> str:
s = f"{self.domain}/{self.name}"
if self.tag:
s += f":{self.tag}"
if self.digest:
s += f"@{self.digest}"
return s
def __eq__(self, __value: object) -> bool:
if not isinstance(__value, ImageReference):
return NotImplemented
return (
self.domain == __value.domain
and self.name == __value.name
and self.tag == __value.tag
and self.digest == __value.digest
)
def _ensure_valid_tag(tag: str) -> str:
"""Ensure a tag is valid for an OCI registry."""
sanitized = re.sub(r"[^\w.-]", "_", tag)
if len(sanitized) > 128:
return sanitized[:64] + sanitized[-64:]
return sanitized
def default_tag(spec: "spack.spec.Spec") -> str:
"""Return a valid, default image tag for a spec."""
return _ensure_valid_tag(f"{spec.name}-{spec.version}-{spec.dag_hash()}.spack")
#: Default OCI index tag
default_index_tag = "index.spack"
def tag_is_spec(tag: str) -> bool:
"""Check if a tag is likely a Spec"""
return tag.endswith(".spack") and tag != default_index_tag
def default_config(architecture: str, os: str):
return {
"architecture": architecture,
"os": os,
"rootfs": {"type": "layers", "diff_ids": []},
"config": {"Env": []},
}
def default_manifest():
return {
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"schemaVersion": 2,
"config": {"mediaType": "application/vnd.oci.image.config.v1+json"},
"layers": [],
}

381
lib/spack/spack/oci/oci.py Normal file
View File

@@ -0,0 +1,381 @@
# Copyright 2013-2023 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import hashlib
import json
import os
import time
import urllib.error
import urllib.parse
import urllib.request
from http.client import HTTPResponse
from typing import NamedTuple, Tuple
from urllib.request import Request
import llnl.util.tty as tty
import spack.binary_distribution
import spack.config
import spack.error
import spack.fetch_strategy
import spack.mirror
import spack.oci.opener
import spack.repo
import spack.spec
import spack.stage
import spack.traverse
import spack.util.crypto
from .image import Digest, ImageReference
class Blob(NamedTuple):
compressed_digest: Digest
uncompressed_digest: Digest
size: int
def create_tarball(spec: spack.spec.Spec, tarfile_path):
buildinfo = spack.binary_distribution.get_buildinfo_dict(spec)
return spack.binary_distribution._do_create_tarball(tarfile_path, spec.prefix, buildinfo)
def _log_upload_progress(digest: Digest, size: int, elapsed: float):
elapsed = max(elapsed, 0.001) # guard against division by zero
tty.info(f"Uploaded {digest} ({elapsed:.2f}s, {size / elapsed / 1024 / 1024:.2f} MB/s)")
def with_query_param(url: str, param: str, value: str) -> str:
"""Add a query parameter to a URL
Args:
url: The URL to add the parameter to.
param: The parameter name.
value: The parameter value.
Returns:
The URL with the parameter added.
"""
parsed = urllib.parse.urlparse(url)
query = urllib.parse.parse_qs(parsed.query)
if param in query:
query[param].append(value)
else:
query[param] = [value]
return urllib.parse.urlunparse(
parsed._replace(query=urllib.parse.urlencode(query, doseq=True))
)
def upload_blob(
ref: ImageReference,
file: str,
digest: Digest,
force: bool = False,
small_file_size: int = 0,
_urlopen: spack.oci.opener.MaybeOpen = None,
) -> bool:
"""Uploads a blob to an OCI registry
We only do monolithic uploads, even though it's very simple to do chunked.
Observed problems with chunked uploads:
(1) it's slow, many sequential requests, (2) some registries set an *unknown*
max chunk size, and the spec doesn't say how to obtain it
Args:
ref: The image reference.
file: The file to upload.
digest: The digest of the file.
force: Whether to force upload the blob, even if it already exists.
small_file_size: For files at most this size, attempt
to do a single POST request instead of POST + PUT.
Some registries do no support single requests, and others
do not specify what size they support in single POST.
For now this feature is disabled by default (0KB)
Returns:
True if the blob was uploaded, False if it already existed.
"""
_urlopen = _urlopen or spack.oci.opener.urlopen
# Test if the blob already exists, if so, early exit.
if not force and blob_exists(ref, digest, _urlopen):
return False
start = time.time()
with open(file, "rb") as f:
file_size = os.fstat(f.fileno()).st_size
# For small blobs, do a single POST request.
# The spec says that registries MAY support this
if file_size <= small_file_size:
request = Request(
url=ref.uploads_url(digest),
method="POST",
data=f,
headers={
"Content-Type": "application/octet-stream",
"Content-Length": str(file_size),
},
)
else:
request = Request(
url=ref.uploads_url(), method="POST", headers={"Content-Length": "0"}
)
response = _urlopen(request)
# Created the blob in one go.
if response.status == 201:
_log_upload_progress(digest, file_size, time.time() - start)
return True
# Otherwise, do another PUT request.
spack.oci.opener.ensure_status(response, 202)
assert "Location" in response.headers
# Can be absolute or relative, joining handles both
upload_url = with_query_param(
ref.endpoint(response.headers["Location"]), "digest", str(digest)
)
f.seek(0)
response = _urlopen(
Request(
url=upload_url,
method="PUT",
data=f,
headers={
"Content-Type": "application/octet-stream",
"Content-Length": str(file_size),
},
)
)
spack.oci.opener.ensure_status(response, 201)
# print elapsed time and # MB/s
_log_upload_progress(digest, file_size, time.time() - start)
return True
def upload_manifest(
ref: ImageReference,
oci_manifest: dict,
tag: bool = True,
_urlopen: spack.oci.opener.MaybeOpen = None,
):
"""Uploads a manifest/index to a registry
Args:
ref: The image reference.
oci_manifest: The OCI manifest or index.
tag: When true, use the tag, otherwise use the digest,
this is relevant for multi-arch images, where the
tag is an index, referencing the manifests by digest.
Returns:
The digest and size of the uploaded manifest.
"""
_urlopen = _urlopen or spack.oci.opener.urlopen
data = json.dumps(oci_manifest, separators=(",", ":")).encode()
digest = Digest.from_sha256(hashlib.sha256(data).hexdigest())
size = len(data)
if not tag:
ref = ref.with_digest(digest)
response = _urlopen(
Request(
url=ref.manifest_url(),
method="PUT",
data=data,
headers={"Content-Type": oci_manifest["mediaType"]},
)
)
spack.oci.opener.ensure_status(response, 201)
return digest, size
def image_from_mirror(mirror: spack.mirror.Mirror) -> ImageReference:
"""Given an OCI based mirror, extract the URL and image name from it"""
url = mirror.push_url
if not url.startswith("oci://"):
raise ValueError(f"Mirror {mirror} is not an OCI mirror")
return ImageReference.from_string(url[6:])
def blob_exists(
ref: ImageReference, digest: Digest, _urlopen: spack.oci.opener.MaybeOpen = None
) -> bool:
"""Checks if a blob exists in an OCI registry"""
try:
_urlopen = _urlopen or spack.oci.opener.urlopen
response = _urlopen(Request(url=ref.blob_url(digest), method="HEAD"))
return response.status == 200
except urllib.error.HTTPError as e:
if e.getcode() == 404:
return False
raise
def copy_missing_layers(
src: ImageReference,
dst: ImageReference,
architecture: str,
_urlopen: spack.oci.opener.MaybeOpen = None,
) -> Tuple[dict, dict]:
"""Copy image layers from src to dst for given architecture.
Args:
src: The source image reference.
dst: The destination image reference.
architecture: The architecture (when referencing an index)
Returns:
Tuple of manifest and config of the base image.
"""
_urlopen = _urlopen or spack.oci.opener.urlopen
manifest, config = get_manifest_and_config(src, architecture, _urlopen=_urlopen)
# Get layer digests
digests = [Digest.from_string(layer["digest"]) for layer in manifest["layers"]]
# Filter digests that are don't exist in the registry
missing_digests = [
digest for digest in digests if not blob_exists(dst, digest, _urlopen=_urlopen)
]
if not missing_digests:
return manifest, config
# Pull missing blobs, push them to the registry
with spack.stage.StageComposite.from_iterable(
make_stage(url=src.blob_url(digest), digest=digest, _urlopen=_urlopen)
for digest in missing_digests
) as stages:
stages.fetch()
stages.check()
stages.cache_local()
for stage, digest in zip(stages, missing_digests):
# No need to check existince again, force=True.
upload_blob(
dst, file=stage.save_filename, force=True, digest=digest, _urlopen=_urlopen
)
return manifest, config
#: OCI manifest content types (including docker type)
manifest_content_type = [
"application/vnd.oci.image.manifest.v1+json",
"application/vnd.docker.distribution.manifest.v2+json",
]
#: OCI index content types (including docker type)
index_content_type = [
"application/vnd.oci.image.index.v1+json",
"application/vnd.docker.distribution.manifest.list.v2+json",
]
#: All OCI manifest / index content types
all_content_type = manifest_content_type + index_content_type
def get_manifest_and_config(
ref: ImageReference,
architecture="amd64",
recurse=3,
_urlopen: spack.oci.opener.MaybeOpen = None,
) -> Tuple[dict, dict]:
"""Recursively fetch manifest and config for a given image reference
with a given architecture.
Args:
ref: The image reference.
architecture: The architecture (when referencing an index)
recurse: How many levels of index to recurse into.
Returns:
A tuple of (manifest, config)"""
_urlopen = _urlopen or spack.oci.opener.urlopen
# Get manifest
response: HTTPResponse = _urlopen(
Request(url=ref.manifest_url(), headers={"Accept": ", ".join(all_content_type)})
)
# Recurse when we find an index
if response.headers["Content-Type"] in index_content_type:
if recurse == 0:
raise Exception("Maximum recursion depth reached while fetching OCI manifest")
index = json.load(response)
manifest_meta = next(
manifest
for manifest in index["manifests"]
if manifest["platform"]["architecture"] == architecture
)
return get_manifest_and_config(
ref.with_digest(manifest_meta["digest"]),
architecture=architecture,
recurse=recurse - 1,
_urlopen=_urlopen,
)
# Otherwise, require a manifest
if response.headers["Content-Type"] not in manifest_content_type:
raise Exception(f"Unknown content type {response.headers['Content-Type']}")
manifest = json.load(response)
# Download, verify and cache config file
config_digest = Digest.from_string(manifest["config"]["digest"])
with make_stage(ref.blob_url(config_digest), config_digest, _urlopen=_urlopen) as stage:
stage.fetch()
stage.check()
stage.cache_local()
with open(stage.save_filename, "rb") as f:
config = json.load(f)
return manifest, config
#: Same as upload_manifest, but with retry wrapper
upload_manifest_with_retry = spack.oci.opener.default_retry(upload_manifest)
#: Same as upload_blob, but with retry wrapper
upload_blob_with_retry = spack.oci.opener.default_retry(upload_blob)
#: Same as get_manifest_and_config, but with retry wrapper
get_manifest_and_config_with_retry = spack.oci.opener.default_retry(get_manifest_and_config)
#: Same as copy_missing_layers, but with retry wrapper
copy_missing_layers_with_retry = spack.oci.opener.default_retry(copy_missing_layers)
def make_stage(
url: str, digest: Digest, keep: bool = False, _urlopen: spack.oci.opener.MaybeOpen = None
) -> spack.stage.Stage:
_urlopen = _urlopen or spack.oci.opener.urlopen
fetch_strategy = spack.fetch_strategy.OCIRegistryFetchStrategy(
url, checksum=digest.digest, _urlopen=_urlopen
)
# Use blobs/<alg>/<encoded> as the cache path, which follows
# the OCI Image Layout Specification. What's missing though,
# is the `oci-layout` and `index.json` files, which are
# required by the spec.
return spack.stage.Stage(
fetch_strategy,
mirror_paths=spack.mirror.OCIImageLayout(digest),
name=digest.digest,
keep=keep,
)

View File

@@ -0,0 +1,442 @@
# Copyright 2013-2023 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
"""All the logic for OCI fetching and authentication"""
import base64
import json
import re
import time
import urllib.error
import urllib.parse
import urllib.request
from enum import Enum, auto
from http.client import HTTPResponse
from typing import Callable, Dict, Iterable, List, NamedTuple, Optional, Tuple
from urllib.request import Request
import llnl.util.lang
import spack.config
import spack.mirror
import spack.parser
import spack.repo
import spack.util.web
from .image import ImageReference
def _urlopen():
opener = create_opener()
def dispatch_open(fullurl, data=None, timeout=None):
timeout = timeout or spack.config.get("config:connect_timeout", 10)
return opener.open(fullurl, data, timeout)
return dispatch_open
OpenType = Callable[..., HTTPResponse]
MaybeOpen = Optional[OpenType]
#: Opener that automatically uses OCI authentication based on mirror config
urlopen: OpenType = llnl.util.lang.Singleton(_urlopen)
SP = r" "
OWS = r"[ \t]*"
BWS = OWS
HTAB = r"\t"
VCHAR = r"\x21-\x7E"
tchar = r"[!#$%&'*+\-.^_`|~0-9A-Za-z]"
token = rf"{tchar}+"
obs_text = r"\x80-\xFF"
qdtext = rf"[{HTAB}{SP}\x21\x23-\x5B\x5D-\x7E{obs_text}]"
quoted_pair = rf"\\([{HTAB}{SP}{VCHAR}{obs_text}])"
quoted_string = rf'"(?:({qdtext}*)|{quoted_pair})*"'
class TokenType(spack.parser.TokenBase):
AUTH_PARAM = rf"({token}){BWS}={BWS}({token}|{quoted_string})"
# TOKEN68 = r"([A-Za-z0-9\-._~+/]+=*)" # todo... support this?
TOKEN = rf"{tchar}+"
EQUALS = rf"{BWS}={BWS}"
COMMA = rf"{OWS},{OWS}"
SPACE = r" +"
EOF = r"$"
ANY = r"."
TOKEN_REGEXES = [rf"(?P<{token}>{token.regex})" for token in TokenType]
ALL_TOKENS = re.compile("|".join(TOKEN_REGEXES))
class State(Enum):
CHALLENGE = auto()
AUTH_PARAM_LIST_START = auto()
AUTH_PARAM = auto()
NEXT_IN_LIST = auto()
AUTH_PARAM_OR_SCHEME = auto()
def tokenize(input: str):
scanner = ALL_TOKENS.scanner(input) # type: ignore[attr-defined]
for match in iter(scanner.match, None): # type: ignore[var-annotated]
yield spack.parser.Token(
TokenType.__members__[match.lastgroup], # type: ignore[attr-defined]
match.group(), # type: ignore[attr-defined]
match.start(), # type: ignore[attr-defined]
match.end(), # type: ignore[attr-defined]
)
class Challenge:
__slots__ = ["scheme", "params"]
def __init__(
self, scheme: Optional[str] = None, params: Optional[List[Tuple[str, str]]] = None
) -> None:
self.scheme = scheme or ""
self.params = params or []
def __repr__(self) -> str:
return f"Challenge({self.scheme}, {self.params})"
def __eq__(self, other: object) -> bool:
return (
isinstance(other, Challenge)
and self.scheme == other.scheme
and self.params == other.params
)
def parse_www_authenticate(input: str):
"""Very basic parsing of www-authenticate parsing (RFC7235 section 4.1)
Notice: this omits token68 support."""
# auth-scheme = token
# auth-param = token BWS "=" BWS ( token / quoted-string )
# challenge = auth-scheme [ 1*SP ( token68 / #auth-param ) ]
# WWW-Authenticate = 1#challenge
challenges: List[Challenge] = []
_unquote = re.compile(quoted_pair).sub
unquote = lambda s: _unquote(r"\1", s[1:-1])
mode: State = State.CHALLENGE
tokens = tokenize(input)
current_challenge = Challenge()
def extract_auth_param(input: str) -> Tuple[str, str]:
key, value = input.split("=", 1)
key = key.rstrip()
value = value.lstrip()
if value.startswith('"'):
value = unquote(value)
return key, value
while True:
token: spack.parser.Token = next(tokens)
if mode == State.CHALLENGE:
if token.kind == TokenType.EOF:
raise ValueError(token)
elif token.kind == TokenType.TOKEN:
current_challenge.scheme = token.value
mode = State.AUTH_PARAM_LIST_START
else:
raise ValueError(token)
elif mode == State.AUTH_PARAM_LIST_START:
if token.kind == TokenType.EOF:
challenges.append(current_challenge)
break
elif token.kind == TokenType.COMMA:
# Challenge without param list, followed by another challenge.
challenges.append(current_challenge)
current_challenge = Challenge()
mode = State.CHALLENGE
elif token.kind == TokenType.SPACE:
# A space means it must be followed by param list
mode = State.AUTH_PARAM
else:
raise ValueError(token)
elif mode == State.AUTH_PARAM:
if token.kind == TokenType.EOF:
raise ValueError(token)
elif token.kind == TokenType.AUTH_PARAM:
key, value = extract_auth_param(token.value)
current_challenge.params.append((key, value))
mode = State.NEXT_IN_LIST
else:
raise ValueError(token)
elif mode == State.NEXT_IN_LIST:
if token.kind == TokenType.EOF:
challenges.append(current_challenge)
break
elif token.kind == TokenType.COMMA:
mode = State.AUTH_PARAM_OR_SCHEME
else:
raise ValueError(token)
elif mode == State.AUTH_PARAM_OR_SCHEME:
if token.kind == TokenType.EOF:
raise ValueError(token)
elif token.kind == TokenType.TOKEN:
challenges.append(current_challenge)
current_challenge = Challenge(token.value)
mode = State.AUTH_PARAM_LIST_START
elif token.kind == TokenType.AUTH_PARAM:
key, value = extract_auth_param(token.value)
current_challenge.params.append((key, value))
mode = State.NEXT_IN_LIST
return challenges
class RealmServiceScope(NamedTuple):
realm: str
service: str
scope: str
class UsernamePassword(NamedTuple):
username: str
password: str
def get_bearer_challenge(challenges: List[Challenge]) -> Optional[RealmServiceScope]:
# Find a challenge that we can handle (currently only Bearer)
challenge = next((c for c in challenges if c.scheme == "Bearer"), None)
if challenge is None:
return None
# Get realm / service / scope from challenge
realm = next((v for k, v in challenge.params if k == "realm"), None)
service = next((v for k, v in challenge.params if k == "service"), None)
scope = next((v for k, v in challenge.params if k == "scope"), None)
if realm is None or service is None or scope is None:
return None
return RealmServiceScope(realm, service, scope)
class OCIAuthHandler(urllib.request.BaseHandler):
def __init__(self, credentials_provider: Callable[[str], Optional[UsernamePassword]]):
"""
Args:
credentials_provider: A function that takes a domain and may return a UsernamePassword.
"""
self.credentials_provider = credentials_provider
# Cached bearer tokens for a given domain.
self.cached_tokens: Dict[str, str] = {}
def obtain_bearer_token(self, registry: str, challenge: RealmServiceScope, timeout) -> str:
# See https://docs.docker.com/registry/spec/auth/token/
query = urllib.parse.urlencode(
{"service": challenge.service, "scope": challenge.scope, "client_id": "spack"}
)
parsed = urllib.parse.urlparse(challenge.realm)._replace(
query=query, fragment="", params=""
)
# Don't send credentials over insecure transport.
if parsed.scheme != "https":
raise ValueError(
f"Cannot login to {registry} over insecure {parsed.scheme} connection"
)
request = Request(urllib.parse.urlunparse(parsed))
# I guess we shouldn't cache this, since we don't know
# the context in which it's used (may depend on config)
pair = self.credentials_provider(registry)
if pair is not None:
encoded = base64.b64encode(f"{pair.username}:{pair.password}".encode("utf-8")).decode(
"utf-8"
)
request.add_unredirected_header("Authorization", f"Basic {encoded}")
# Do a GET request.
response = self.parent.open(request, timeout=timeout)
# Read the response and parse the JSON
response_json = json.load(response)
# Get the token from the response
token = response_json["token"]
# Remember the last obtained token for this registry
# Note: we should probably take into account realm, service and scope
# so we can store multiple tokens for the same registry.
self.cached_tokens[registry] = token
return token
def https_request(self, req: Request):
# Eagerly add the bearer token to the request if no
# auth header is set yet, to avoid 401s in multiple
# requests to the same registry.
# Use has_header, not .headers, since there are two
# types of headers (redirected and unredirected)
if req.has_header("Authorization"):
return req
parsed = urllib.parse.urlparse(req.full_url)
token = self.cached_tokens.get(parsed.netloc)
if not token:
return req
req.add_unredirected_header("Authorization", f"Bearer {token}")
return req
def http_error_401(self, req: Request, fp, code, msg, headers):
# Login failed, avoid infinite recursion where we go back and
# forth between auth server and registry
if hasattr(req, "login_attempted"):
raise urllib.error.HTTPError(
req.full_url, code, f"Failed to login to {req.full_url}: {msg}", headers, fp
)
# On 401 Unauthorized, parse the WWW-Authenticate header
# to determine what authentication is required
if "WWW-Authenticate" not in headers:
raise urllib.error.HTTPError(
req.full_url,
code,
"Cannot login to registry, missing WWW-Authenticate header",
headers,
fp,
)
header_value = headers["WWW-Authenticate"]
try:
challenge = get_bearer_challenge(parse_www_authenticate(header_value))
except ValueError as e:
raise urllib.error.HTTPError(
req.full_url,
code,
f"Cannot login to registry, malformed WWW-Authenticate header: {header_value}",
headers,
fp,
) from e
# If there is no bearer challenge, we can't handle it
if not challenge:
raise urllib.error.HTTPError(
req.full_url,
code,
f"Cannot login to registry, unsupported authentication scheme: {header_value}",
headers,
fp,
)
# Get the token from the auth handler
try:
token = self.obtain_bearer_token(
registry=urllib.parse.urlparse(req.get_full_url()).netloc,
challenge=challenge,
timeout=req.timeout,
)
except ValueError as e:
raise urllib.error.HTTPError(
req.full_url,
code,
f"Cannot login to registry, failed to obtain bearer token: {e}",
headers,
fp,
) from e
# Add the token to the request
req.add_unredirected_header("Authorization", f"Bearer {token}")
setattr(req, "login_attempted", True)
return self.parent.open(req, timeout=req.timeout)
def credentials_from_mirrors(
domain: str, *, mirrors: Optional[Iterable[spack.mirror.Mirror]] = None
) -> Optional[UsernamePassword]:
"""Filter out OCI registry credentials from a list of mirrors."""
mirrors = mirrors or spack.mirror.MirrorCollection().values()
for mirror in mirrors:
# Prefer push credentials over fetch. Unlikely that those are different
# but our config format allows it.
for direction in ("push", "fetch"):
pair = mirror.get_access_pair(direction)
if pair is None:
continue
url = mirror.get_url(direction)
if not url.startswith("oci://"):
continue
try:
parsed = ImageReference.from_string(url[6:])
except ValueError:
continue
if parsed.domain == domain:
return UsernamePassword(*pair)
return None
def create_opener():
"""Create an opener that can handle OCI authentication."""
opener = urllib.request.OpenerDirector()
for handler in [
urllib.request.UnknownHandler(),
urllib.request.HTTPSHandler(),
spack.util.web.SpackHTTPDefaultErrorHandler(),
urllib.request.HTTPRedirectHandler(),
urllib.request.HTTPErrorProcessor(),
OCIAuthHandler(credentials_from_mirrors),
]:
opener.add_handler(handler)
return opener
def ensure_status(response: HTTPResponse, status: int):
"""Raise an error if the response status is not the expected one."""
if response.status == status:
return
raise urllib.error.HTTPError(
response.geturl(), response.status, response.reason, response.info(), None
)
def default_retry(f, retries: int = 3, sleep=None):
sleep = sleep or time.sleep
def wrapper(*args, **kwargs):
for i in range(retries):
try:
return f(*args, **kwargs)
except urllib.error.HTTPError as e:
# Retry on internal server errors, and rate limit errors
# Potentially this could take into account the Retry-After header
# if registries support it
if i + 1 != retries and (500 <= e.code < 600 or e.code == 429):
# Exponential backoff
sleep(2**i)
continue
raise
return wrapper

View File

@@ -66,7 +66,6 @@
import spack.error
import spack.spec
import spack.variant
import spack.version
IS_WINDOWS = sys.platform == "win32"
@@ -164,7 +163,7 @@ class Token:
__slots__ = "kind", "value", "start", "end"
def __init__(
self, kind: TokenType, value: str, start: Optional[int] = None, end: Optional[int] = None
self, kind: TokenBase, value: str, start: Optional[int] = None, end: Optional[int] = None
):
self.kind = kind
self.value = value
@@ -264,8 +263,8 @@ def tokens(self) -> List[Token]:
return list(filter(lambda x: x.kind != TokenType.WS, tokenize(self.literal_str)))
def next_spec(
self, initial_spec: Optional[spack.spec.Spec] = None
) -> Optional[spack.spec.Spec]:
self, initial_spec: Optional["spack.spec.Spec"] = None
) -> Optional["spack.spec.Spec"]:
"""Return the next spec parsed from text.
Args:
@@ -298,7 +297,7 @@ def next_spec(
return root_spec
def all_specs(self) -> List[spack.spec.Spec]:
def all_specs(self) -> List["spack.spec.Spec"]:
"""Return all the specs that remain to be parsed"""
return list(iter(self.next_spec, None))
@@ -313,7 +312,9 @@ def __init__(self, ctx):
self.has_compiler = False
self.has_version = False
def parse(self, initial_spec: Optional[spack.spec.Spec] = None) -> Optional[spack.spec.Spec]:
def parse(
self, initial_spec: Optional["spack.spec.Spec"] = None
) -> Optional["spack.spec.Spec"]:
"""Parse a single spec node from a stream of tokens
Args:
@@ -414,7 +415,7 @@ class FileParser:
def __init__(self, ctx):
self.ctx = ctx
def parse(self, initial_spec: spack.spec.Spec) -> spack.spec.Spec:
def parse(self, initial_spec: "spack.spec.Spec") -> "spack.spec.Spec":
"""Parse a spec tree from a specfile.
Args:
@@ -437,7 +438,7 @@ def parse(self, initial_spec: spack.spec.Spec) -> spack.spec.Spec:
return initial_spec
def parse(text: str) -> List[spack.spec.Spec]:
def parse(text: str) -> List["spack.spec.Spec"]:
"""Parse text into a list of strings
Args:
@@ -450,8 +451,8 @@ def parse(text: str) -> List[spack.spec.Spec]:
def parse_one_or_raise(
text: str, initial_spec: Optional[spack.spec.Spec] = None
) -> spack.spec.Spec:
text: str, initial_spec: Optional["spack.spec.Spec"] = None
) -> "spack.spec.Spec":
"""Parse exactly one spec from text and return it, or raise
Args:

View File

@@ -75,6 +75,7 @@
import spack.deptypes as dt
import spack.error
import spack.hash_types as ht
import spack.parser
import spack.patch
import spack.paths
import spack.platforms
@@ -1318,8 +1319,6 @@ def __init__(
self.external_path = external_path
self.external_module = external_module
"""
import spack.parser
# Copy if spec_like is a Spec.
if isinstance(spec_like, Spec):
self._dup(spec_like)

View File

@@ -37,6 +37,7 @@
import spack.fetch_strategy as fs
import spack.mirror
import spack.paths
import spack.resource
import spack.spec
import spack.stage
import spack.util.lock
@@ -455,6 +456,7 @@ def fetch(self, mirror_only=False, err_msg=None):
mirror_urls = [
url_util.join(mirror.fetch_url, rel_path)
for mirror in spack.mirror.MirrorCollection(source=True).values()
if not mirror.fetch_url.startswith("oci://")
for rel_path in self.mirror_paths
]
@@ -658,8 +660,14 @@ def destroy(self):
class ResourceStage(Stage):
def __init__(self, url_or_fetch_strategy, root, resource, **kwargs):
super().__init__(url_or_fetch_strategy, **kwargs)
def __init__(
self,
fetch_strategy: fs.FetchStrategy,
root: Stage,
resource: spack.resource.Resource,
**kwargs,
):
super().__init__(fetch_strategy, **kwargs)
self.root_stage = root
self.resource = resource

View File

@@ -326,4 +326,8 @@ def fake_push(node, push_url, options):
buildcache(*buildcache_create_args)
assert packages_to_push == expected
# Order is not guaranteed, so we can't just compare lists
assert set(packages_to_push) == set(expected)
# Ensure no duplicates
assert len(set(packages_to_push)) == len(packages_to_push)

View File

@@ -31,6 +31,7 @@
import spack.binary_distribution
import spack.caches
import spack.cmd.buildcache
import spack.compilers
import spack.config
import spack.database
@@ -1948,3 +1949,21 @@ def pytest_runtest_setup(item):
not_on_windows_marker = item.get_closest_marker(name="not_on_windows")
if not_on_windows_marker and sys.platform == "win32":
pytest.skip(*not_on_windows_marker.args)
@pytest.fixture(scope="function")
def disable_parallel_buildcache_push(monkeypatch):
class MockPool:
def map(self, func, args):
return [func(a) for a in args]
def starmap(self, func, args):
return [func(*a) for a in args]
def __enter__(self):
return self
def __exit__(self, *args):
pass
monkeypatch.setattr(spack.cmd.buildcache, "_make_pool", MockPool)

View File

@@ -0,0 +1,101 @@
# Copyright 2013-2023 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import re
import pytest
import spack.spec
from spack.oci.image import Digest, ImageReference, default_tag, tag
@pytest.mark.parametrize(
"image_ref, expected",
[
(
f"example.com:1234/a/b/c:tag@sha256:{'a'*64}",
("example.com:1234", "a/b/c", "tag", Digest.from_sha256("a" * 64)),
),
("example.com:1234/a/b/c:tag", ("example.com:1234", "a/b/c", "tag", None)),
("example.com:1234/a/b/c", ("example.com:1234", "a/b/c", "latest", None)),
(
f"example.com:1234/a/b/c@sha256:{'a'*64}",
("example.com:1234", "a/b/c", "latest", Digest.from_sha256("a" * 64)),
),
# ipv4
("1.2.3.4:1234/a/b/c:tag", ("1.2.3.4:1234", "a/b/c", "tag", None)),
# ipv6
("[2001:db8::1]:1234/a/b/c:tag", ("[2001:db8::1]:1234", "a/b/c", "tag", None)),
# Follow docker rules for parsing
("ubuntu:22.04", ("index.docker.io", "library/ubuntu", "22.04", None)),
("myname/myimage:abc", ("index.docker.io", "myname/myimage", "abc", None)),
("myname:1234/myimage:abc", ("myname:1234", "myimage", "abc", None)),
("localhost/myimage:abc", ("localhost", "myimage", "abc", None)),
("localhost:1234/myimage:abc", ("localhost:1234", "myimage", "abc", None)),
],
)
def test_name_parsing(image_ref, expected):
x = ImageReference.from_string(image_ref)
assert (x.domain, x.name, x.tag, x.digest) == expected
@pytest.mark.parametrize(
"image_ref",
[
# wrong order of tag and sha
f"example.com:1234/a/b/c@sha256:{'a'*64}:tag",
# double tag
"example.com:1234/a/b/c:tag:tag",
# empty tag
"example.com:1234/a/b/c:",
# empty digest
"example.com:1234/a/b/c@sha256:",
# unsupport digest algorithm
f"example.com:1234/a/b/c@sha512:{'a'*128}",
# invalid digest length
f"example.com:1234/a/b/c@sha256:{'a'*63}",
# whitespace
"example.com:1234/a/b/c :tag",
"example.com:1234/a/b/c: tag",
"example.com:1234/a/b/c:tag ",
" example.com:1234/a/b/c:tag",
# broken ipv4
"1.2..3:1234/a/b/c:tag",
],
)
def test_parsing_failure(image_ref):
with pytest.raises(ValueError):
ImageReference.from_string(image_ref)
def test_digest():
valid_digest = "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"
# Test string roundtrip
assert str(Digest.from_string(f"sha256:{valid_digest}")) == f"sha256:{valid_digest}"
# Invalid digest length
with pytest.raises(ValueError):
Digest.from_string("sha256:abcdef")
# Missing algorithm
with pytest.raises(ValueError):
Digest.from_string(valid_digest)
@pytest.mark.parametrize(
"spec",
[
# Standard case
"short-name@=1.2.3",
# Unsupported characters in git version
f"git-version@{1:040x}=develop",
# Too long of a name
f"{'too-long':x<256}@=1.2.3",
],
)
def test_default_tag(spec: str):
"""Make sure that computed image tags are valid."""
assert re.fullmatch(tag, default_tag(spack.spec.Spec(spec)))

View File

@@ -0,0 +1,148 @@
# Copyright 2013-2023 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
# These are slow integration tests that do concretization, install, tarballing
# and compression. They still use an in-memory OCI registry.
import hashlib
import json
import os
from contextlib import contextmanager
import spack.oci.opener
from spack.binary_distribution import gzip_compressed_tarfile
from spack.main import SpackCommand
from spack.oci.image import Digest, ImageReference, default_config, default_manifest
from spack.oci.oci import blob_exists, get_manifest_and_config, upload_blob, upload_manifest
from spack.test.oci.mock_registry import DummyServer, InMemoryOCIRegistry, create_opener
buildcache = SpackCommand("buildcache")
mirror = SpackCommand("mirror")
@contextmanager
def oci_servers(*servers: DummyServer):
old_opener = spack.oci.opener.urlopen
spack.oci.opener.urlopen = create_opener(*servers).open
yield
spack.oci.opener.urlopen = old_opener
def test_buildcache_push_command(mutable_database, disable_parallel_buildcache_push):
with oci_servers(InMemoryOCIRegistry("example.com")):
mirror("add", "oci-test", "oci://example.com/image")
# Push the package(s) to the OCI registry
buildcache("push", "--update-index", "oci-test", "mpileaks^mpich")
# Remove mpileaks from the database
matches = mutable_database.query_local("mpileaks^mpich")
assert len(matches) == 1
spec = matches[0]
spec.package.do_uninstall()
# Reinstall mpileaks from the OCI registry
buildcache("install", "--unsigned", "mpileaks^mpich")
# Now it should be installed again
assert spec.installed
# And let's check that the bin/mpileaks executable is there
assert os.path.exists(os.path.join(spec.prefix, "bin", "mpileaks"))
def test_buildcache_push_with_base_image_command(
mutable_database, tmpdir, disable_parallel_buildcache_push
):
"""Test that we can push a package with a base image to an OCI registry.
This test is a bit involved, cause we have to create a small base image."""
registry_src = InMemoryOCIRegistry("src.example.com")
registry_dst = InMemoryOCIRegistry("dst.example.com")
base_image = ImageReference.from_string("src.example.com/my-base-image:latest")
with oci_servers(registry_src, registry_dst):
mirror("add", "oci-test", "oci://dst.example.com/image")
# TODO: simplify creation of images...
# We create a rootfs.tar.gz, a config file and a manifest file,
# and upload those.
config, manifest = default_config(architecture="amd64", os="linux"), default_manifest()
# Create a small rootfs
rootfs = tmpdir.join("rootfs")
rootfs.ensure(dir=True)
rootfs.join("bin").ensure(dir=True)
rootfs.join("bin", "sh").ensure(file=True)
# Create a tarball of it.
tarball = tmpdir.join("base.tar.gz")
with gzip_compressed_tarfile(tarball) as (tar, tar_gz_checksum, tar_checksum):
tar.add(rootfs, arcname=".")
tar_gz_digest = Digest.from_sha256(tar_gz_checksum.hexdigest())
tar_digest = Digest.from_sha256(tar_checksum.hexdigest())
# Save the config file
config["rootfs"]["diff_ids"] = [str(tar_digest)]
config_file = tmpdir.join("config.json")
with open(config_file, "w") as f:
f.write(json.dumps(config))
config_digest = Digest.from_sha256(
hashlib.sha256(open(config_file, "rb").read()).hexdigest()
)
# Register the layer in the manifest
manifest["layers"].append(
{
"mediaType": "application/vnd.oci.image.layer.v1.tar+gzip",
"digest": str(tar_gz_digest),
"size": tarball.size(),
}
)
manifest["config"]["digest"] = str(config_digest)
manifest["config"]["size"] = config_file.size()
# Upload the layer and config file
upload_blob(base_image, tarball, tar_gz_digest)
upload_blob(base_image, config_file, config_digest)
# Upload the manifest
upload_manifest(base_image, manifest)
# END TODO
# Finally... use it as a base image
buildcache("push", "--base-image", str(base_image), "oci-test", "mpileaks^mpich")
# Figure out what tag was produced
tag = next(tag for _, tag in registry_dst.manifests.keys() if tag.startswith("mpileaks-"))
assert tag is not None
# Fetch the manifest and config
dst_image = ImageReference.from_string(f"dst.example.com/image:{tag}")
retrieved_manifest, retrieved_config = get_manifest_and_config(dst_image)
# Check that the base image layer is first.
assert retrieved_manifest["layers"][0]["digest"] == str(tar_gz_digest)
assert retrieved_config["rootfs"]["diff_ids"][0] == str(tar_digest)
# And also check that we have layers for each link-run dependency
matches = mutable_database.query_local("mpileaks^mpich")
assert len(matches) == 1
spec = matches[0]
num_runtime_deps = len(list(spec.traverse(root=True, deptype=("link", "run"))))
# One base layer + num_runtime_deps
assert len(retrieved_manifest["layers"]) == 1 + num_runtime_deps
# And verify that all layers including the base layer are present
for layer in retrieved_manifest["layers"]:
assert blob_exists(dst_image, digest=Digest.from_string(layer["digest"]))

View File

@@ -0,0 +1,410 @@
# Copyright 2013-2023 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import base64
import email.message
import hashlib
import io
import json
import re
import urllib.error
import urllib.parse
import urllib.request
import uuid
from typing import Callable, Dict, List, Optional, Pattern, Tuple
from urllib.request import Request
from spack.oci.image import Digest
from spack.oci.opener import OCIAuthHandler
class MockHTTPResponse(io.IOBase):
"""This is a mock HTTP response, which implements part of http.client.HTTPResponse"""
def __init__(self, status, reason, headers=None, body=None):
self.msg = None
self.version = 11
self.url = None
self.headers = email.message.EmailMessage()
self.status = status
self.code = status
self.reason = reason
self.debuglevel = 0
self._body = body
if headers is not None:
for key, value in headers.items():
self.headers[key] = value
@classmethod
def with_json(cls, status, reason, headers=None, body=None):
"""Create a mock HTTP response with JSON string as body"""
body = io.BytesIO(json.dumps(body).encode("utf-8"))
return cls(status, reason, headers, body)
def read(self, *args, **kwargs):
return self._body.read(*args, **kwargs)
def getheader(self, name, default=None):
self.headers.get(name, default)
def getheaders(self):
return self.headers.items()
def fileno(self):
return 0
def getcode(self):
return self.status
def info(self):
return self.headers
class MiddlewareError(Exception):
"""Thrown in a handler to return a response early."""
def __init__(self, response: MockHTTPResponse):
self.response = response
class Router:
"""This class is a small router for requests to the OCI registry.
It is used to dispatch requests to a handler, and middleware can be
used to transform requests, as well as return responses early
(e.g. for authentication)."""
def __init__(self) -> None:
self.routes: List[Tuple[str, Pattern, Callable]] = []
self.middleware: List[Callable[[Request], Request]] = []
def handle(self, req: Request) -> MockHTTPResponse:
"""Dispatch a request to a handler."""
result = urllib.parse.urlparse(req.full_url)
# Apply middleware
try:
for handler in self.middleware:
req = handler(req)
except MiddlewareError as e:
return e.response
for method, path_regex, handler in self.routes:
if method != req.get_method():
continue
match = re.fullmatch(path_regex, result.path)
if not match:
continue
return handler(req, **match.groupdict())
return MockHTTPResponse(404, "Not found")
def register(self, method, path: str, handler: Callable):
self.routes.append((method, re.compile(path), handler))
def add_middleware(self, handler: Callable[[Request], Request]):
self.middleware.append(handler)
class DummyServer:
def __init__(self, domain: str) -> None:
# The domain of the server, e.g. "registry.example.com"
self.domain = domain
# List of (method, url) tuples
self.requests: List[Tuple[str, str]] = []
# Dispatches requests to handlers
self.router = Router()
# Always install a request logger
self.router.add_middleware(self.log_request)
def handle(self, req: Request) -> MockHTTPResponse:
return self.router.handle(req)
def log_request(self, req: Request):
path = urllib.parse.urlparse(req.full_url).path
self.requests.append((req.get_method(), path))
return req
def clear_log(self):
self.requests = []
class InMemoryOCIRegistry(DummyServer):
"""This implements the basic OCI registry API, but in memory.
It supports two types of blob uploads:
1. POST + PUT: the client first starts a session with POST, then does a large PUT request
2. POST: the client does a single POST request with the whole blob
Option 2 is not supported by all registries, so we allow to disable it,
with allow_single_post=False.
A third option is to use the chunked upload, but this is not implemented here, because
it's typically a major performance hit in upload speed, so we're not using it in Spack."""
def __init__(self, domain: str, allow_single_post: bool = True) -> None:
super().__init__(domain)
self.router.register("GET", r"/v2/", self.index)
self.router.register("HEAD", r"/v2/(?P<name>.+)/blobs/(?P<digest>.+)", self.head_blob)
self.router.register("POST", r"/v2/(?P<name>.+)/blobs/uploads/", self.start_session)
self.router.register("PUT", r"/upload", self.put_session)
self.router.register("PUT", r"/v2/(?P<name>.+)/manifests/(?P<ref>.+)", self.put_manifest)
self.router.register("GET", r"/v2/(?P<name>.+)/manifests/(?P<ref>.+)", self.get_manifest)
self.router.register("GET", r"/v2/(?P<name>.+)/blobs/(?P<digest>.+)", self.get_blob)
self.router.register("GET", r"/v2/(?P<name>.+)/tags/list", self.list_tags)
# If True, allow single POST upload, not all registries support this
self.allow_single_post = allow_single_post
# Used for POST + PUT upload. This is a map from session ID to image name
self.sessions: Dict[str, str] = {}
# Set of sha256:... digests that are known to the registry
self.blobs: Dict[str, bytes] = {}
# Map from (name, tag) to manifest
self.manifests: Dict[Tuple[str, str], Dict] = {}
def index(self, req: Request):
return MockHTTPResponse.with_json(200, "OK", body={})
def head_blob(self, req: Request, name: str, digest: str):
if digest in self.blobs:
return MockHTTPResponse(200, "OK", headers={"Content-Length": "1234"})
return MockHTTPResponse(404, "Not found")
def get_blob(self, req: Request, name: str, digest: str):
if digest in self.blobs:
return MockHTTPResponse(200, "OK", body=io.BytesIO(self.blobs[digest]))
return MockHTTPResponse(404, "Not found")
def start_session(self, req: Request, name: str):
id = str(uuid.uuid4())
self.sessions[id] = name
# Check if digest is present (single monolithic upload)
result = urllib.parse.urlparse(req.full_url)
query = urllib.parse.parse_qs(result.query)
if self.allow_single_post and "digest" in query:
return self.handle_upload(
req, name=name, digest=Digest.from_string(query["digest"][0])
)
return MockHTTPResponse(202, "Accepted", headers={"Location": f"/upload?uuid={id}"})
def put_session(self, req: Request):
# Do the upload.
result = urllib.parse.urlparse(req.full_url)
query = urllib.parse.parse_qs(result.query)
# uuid param should be preserved, and digest should be present
assert "uuid" in query and len(query["uuid"]) == 1
assert "digest" in query and len(query["digest"]) == 1
id = query["uuid"][0]
assert id in self.sessions
name, digest = self.sessions[id], Digest.from_string(query["digest"][0])
response = self.handle_upload(req, name=name, digest=digest)
# End the session
del self.sessions[id]
return response
def put_manifest(self, req: Request, name: str, ref: str):
# In requests, Python runs header.capitalize().
content_type = req.get_header("Content-type")
assert content_type in (
"application/vnd.oci.image.manifest.v1+json",
"application/vnd.oci.image.index.v1+json",
)
index_or_manifest = json.loads(self._require_data(req))
# Verify that we have all blobs (layers for manifest, manifests for index)
if content_type == "application/vnd.oci.image.manifest.v1+json":
for layer in index_or_manifest["layers"]:
assert layer["digest"] in self.blobs, "Missing blob while uploading manifest"
else:
for manifest in index_or_manifest["manifests"]:
assert (
name,
manifest["digest"],
) in self.manifests, "Missing manifest while uploading index"
self.manifests[(name, ref)] = index_or_manifest
return MockHTTPResponse(
201, "Created", headers={"Location": f"/v2/{name}/manifests/{ref}"}
)
def get_manifest(self, req: Request, name: str, ref: str):
if (name, ref) not in self.manifests:
return MockHTTPResponse(404, "Not found")
manifest_or_index = self.manifests[(name, ref)]
return MockHTTPResponse.with_json(
200,
"OK",
headers={"Content-type": manifest_or_index["mediaType"]},
body=manifest_or_index,
)
def _require_data(self, req: Request) -> bytes:
"""Extract request.data, it's type remains a mystery"""
assert req.data is not None
if hasattr(req.data, "read"):
return req.data.read()
elif isinstance(req.data, bytes):
return req.data
raise ValueError("req.data should be bytes or have a read() method")
def handle_upload(self, req: Request, name: str, digest: Digest):
"""Verify the digest, save the blob, return created status"""
data = self._require_data(req)
assert hashlib.sha256(data).hexdigest() == digest.digest
self.blobs[str(digest)] = data
return MockHTTPResponse(201, "Created", headers={"Location": f"/v2/{name}/blobs/{digest}"})
def list_tags(self, req: Request, name: str):
# List all tags, exclude digests.
tags = [_tag for _name, _tag in self.manifests.keys() if _name == name and ":" not in _tag]
tags.sort()
return MockHTTPResponse.with_json(200, "OK", body={"tags": tags})
class DummyServerUrllibHandler(urllib.request.BaseHandler):
"""Glue between urllib and DummyServer, routing requests to
the correct mock server for a given domain."""
def __init__(self) -> None:
self.servers: Dict[str, DummyServer] = {}
def add_server(self, domain: str, api: DummyServer):
self.servers[domain] = api
return self
def https_open(self, req: Request):
domain = urllib.parse.urlparse(req.full_url).netloc
if domain not in self.servers:
return MockHTTPResponse(404, "Not found")
return self.servers[domain].handle(req)
class InMemoryOCIRegistryWithAuth(InMemoryOCIRegistry):
"""This is another in-memory OCI registry, but it requires authentication."""
def __init__(
self, domain, token: Optional[str], realm: str, allow_single_post: bool = True
) -> None:
super().__init__(domain, allow_single_post)
self.token = token # token to accept
self.realm = realm # url to the authorization server
self.router.add_middleware(self.authenticate)
def authenticate(self, req: Request):
# Any request needs an Authorization header
authorization = req.get_header("Authorization")
if authorization is None:
raise MiddlewareError(self.unauthorized())
# Ensure that the token is correct
assert authorization.startswith("Bearer ")
token = authorization[7:]
if token != self.token:
raise MiddlewareError(self.unauthorized())
return req
def unauthorized(self):
return MockHTTPResponse(
401,
"Unauthorized",
{
"www-authenticate": f'Bearer realm="{self.realm}",'
f'service="{self.domain}",'
'scope="repository:spack-registry:pull,push"'
},
)
class MockBearerTokenServer(DummyServer):
"""Simulates a basic server that hands out bearer tokens
at the /login endpoint for the following services:
public.example.com, which doesn't require Basic Auth
private.example.com, which requires Basic Auth, with user:pass
"""
def __init__(self, domain: str) -> None:
super().__init__(domain)
self.router.register("GET", "/login", self.login)
def login(self, req: Request):
url = urllib.parse.urlparse(req.full_url)
query_params = urllib.parse.parse_qs(url.query)
# Verify query params, from the www-authenticate header
assert query_params["client_id"] == ["spack"]
assert len(query_params["service"]) == 1
assert query_params["scope"] == ["repository:spack-registry:pull,push"]
service = query_params["service"][0]
if service == "public.example.com":
return self.public_auth(req)
elif service == "private.example.com":
return self.private_auth(req)
return MockHTTPResponse(404, "Not found")
def public_auth(self, req: Request):
# No need to login with username and password for the public registry
assert req.get_header("Authorization") is None
return MockHTTPResponse.with_json(200, "OK", body={"token": "public_token"})
def private_auth(self, req: Request):
# For the private registry we need to login with username and password
auth_value = req.get_header("Authorization")
if (
auth_value is None
or not auth_value.startswith("Basic ")
or base64.b64decode(auth_value[6:]) != b"user:pass"
):
return MockHTTPResponse(401, "Unauthorized")
return MockHTTPResponse.with_json(200, "OK", body={"token": "private_token"})
def create_opener(*servers: DummyServer, credentials_provider=None):
"""Creates a mock opener, that can be used to fake requests to a list
of servers."""
opener = urllib.request.OpenerDirector()
handler = DummyServerUrllibHandler()
for server in servers:
handler.add_server(server.domain, server)
opener.add_handler(handler)
opener.add_handler(urllib.request.HTTPDefaultErrorHandler())
opener.add_handler(urllib.request.HTTPErrorProcessor())
if credentials_provider is not None:
opener.add_handler(OCIAuthHandler(credentials_provider))
return opener

View File

@@ -0,0 +1,672 @@
# Copyright 2013-2023 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import hashlib
import json
import urllib.error
import urllib.parse
import urllib.request
from urllib.request import Request
import pytest
import spack.mirror
from spack.oci.image import Digest, ImageReference, default_config, default_manifest
from spack.oci.oci import (
copy_missing_layers,
get_manifest_and_config,
image_from_mirror,
upload_blob,
upload_manifest,
)
from spack.oci.opener import (
Challenge,
RealmServiceScope,
UsernamePassword,
credentials_from_mirrors,
default_retry,
get_bearer_challenge,
parse_www_authenticate,
)
from spack.test.oci.mock_registry import (
DummyServer,
DummyServerUrllibHandler,
InMemoryOCIRegistry,
InMemoryOCIRegistryWithAuth,
MiddlewareError,
MockBearerTokenServer,
MockHTTPResponse,
create_opener,
)
def test_parse_www_authenticate():
"""Test parsing of valid WWW-Authenticate header, check whether it's
decomposed into a list of challenges with correct scheme and parameters
according to RFC 7235 section 4.1"""
www_authenticate = 'Bearer realm="https://spack.io/authenticate",service="spack-registry",scope="repository:spack-registry:pull,push"'
assert parse_www_authenticate(www_authenticate) == [
Challenge(
"Bearer",
[
("realm", "https://spack.io/authenticate"),
("service", "spack-registry"),
("scope", "repository:spack-registry:pull,push"),
],
)
]
assert parse_www_authenticate("Bearer") == [Challenge("Bearer")]
assert parse_www_authenticate("MethodA, MethodB,MethodC") == [
Challenge("MethodA"),
Challenge("MethodB"),
Challenge("MethodC"),
]
assert parse_www_authenticate(
'Digest realm="Digest Realm", nonce="1234567890", algorithm=MD5, qop="auth"'
) == [
Challenge(
"Digest",
[
("realm", "Digest Realm"),
("nonce", "1234567890"),
("algorithm", "MD5"),
("qop", "auth"),
],
)
]
assert parse_www_authenticate(
r'Newauth realm="apps", type=1, title="Login to \"apps\"", Basic realm="simple"'
) == [
Challenge("Newauth", [("realm", "apps"), ("type", "1"), ("title", 'Login to "apps"')]),
Challenge("Basic", [("realm", "simple")]),
]
@pytest.mark.parametrize(
"invalid_str",
[
# Not comma separated
"SchemeA SchemeB SchemeC",
# Unexpected eof
"SchemeA, SchemeB, SchemeC, ",
# Invalid auth param or scheme
r"Scheme x=y, ",
# Unexpected eof
"Scheme key=",
# Invalid token
r'"Bearer"',
# Invalid token
r'Scheme"xyz"',
# No auth param
r"Scheme ",
],
)
def test_invalid_www_authenticate(invalid_str):
with pytest.raises(ValueError):
parse_www_authenticate(invalid_str)
def test_get_bearer_challenge():
"""Test extracting Bearer challenge from a list of challenges"""
# Only an incomplete bearer challenge, missing service and scope, not usable.
assert (
get_bearer_challenge(
[
Challenge("Bearer", [("realm", "https://spack.io/authenticate")]),
Challenge("Basic", [("realm", "simple")]),
Challenge(
"Digest",
[
("realm", "Digest Realm"),
("nonce", "1234567890"),
("algorithm", "MD5"),
("qop", "auth"),
],
),
]
)
is None
)
# Multiple challenges, should pick the bearer one.
assert get_bearer_challenge(
[
Challenge(
"Dummy",
[("realm", "https://example.com/"), ("service", "service"), ("scope", "scope")],
),
Challenge(
"Bearer",
[
("realm", "https://spack.io/authenticate"),
("service", "spack-registry"),
("scope", "repository:spack-registry:pull,push"),
],
),
]
) == RealmServiceScope(
"https://spack.io/authenticate", "spack-registry", "repository:spack-registry:pull,push"
)
@pytest.mark.parametrize(
"image_ref,token",
[
("public.example.com/spack-registry:latest", "public_token"),
("private.example.com/spack-registry:latest", "private_token"),
],
)
def test_automatic_oci_authentication(image_ref, token):
image = ImageReference.from_string(image_ref)
def credentials_provider(domain: str):
return UsernamePassword("user", "pass") if domain == "private.example.com" else None
opener = create_opener(
InMemoryOCIRegistryWithAuth(
image.domain, token=token, realm="https://auth.example.com/login"
),
MockBearerTokenServer("auth.example.com"),
credentials_provider=credentials_provider,
)
# Run this twice, as it will triggers a code path that caches the bearer token
assert opener.open(image.endpoint()).status == 200
assert opener.open(image.endpoint()).status == 200
def test_wrong_credentials():
"""Test that when wrong credentials are rejected by the auth server, we
get a 401 error."""
credentials_provider = lambda domain: UsernamePassword("wrong", "wrong")
image = ImageReference.from_string("private.example.com/image")
opener = create_opener(
InMemoryOCIRegistryWithAuth(
image.domain, token="something", realm="https://auth.example.com/login"
),
MockBearerTokenServer("auth.example.com"),
credentials_provider=credentials_provider,
)
with pytest.raises(urllib.error.HTTPError) as e:
opener.open(image.endpoint())
assert e.value.getcode() == 401
def test_wrong_bearer_token_returned_by_auth_server():
"""When the auth server returns a wrong bearer token, we should get a 401 error
when the request we attempt fails. We shouldn't go in circles getting a 401 from
the registry, then a non-working token from the auth server, then a 401 from the
registry, etc."""
image = ImageReference.from_string("private.example.com/image")
opener = create_opener(
InMemoryOCIRegistryWithAuth(
image.domain,
token="other_token_than_token_server_provides",
realm="https://auth.example.com/login",
),
MockBearerTokenServer("auth.example.com"),
credentials_provider=lambda domain: UsernamePassword("user", "pass"),
)
with pytest.raises(urllib.error.HTTPError) as e:
opener.open(image.endpoint())
assert e.value.getcode() == 401
class TrivialAuthServer(DummyServer):
"""A trivial auth server that hands out a bearer token at GET /login."""
def __init__(self, domain: str, token: str) -> None:
super().__init__(domain)
self.router.register("GET", "/login", self.login)
self.token = token
def login(self, req: Request):
return MockHTTPResponse.with_json(200, "OK", body={"token": self.token})
def test_registry_with_short_lived_bearer_tokens():
"""An issued bearer token is mostly opaque to the client, but typically
it embeds a short-lived expiration date. To speed up requests to a registry,
it's good not to authenticate on every request, but to cache the bearer token,
however: we have to deal with the case of an expired bearer token.
Here we test that when the bearer token expires, we authenticate again, and
when the token is still valid, we don't re-authenticate."""
image = ImageReference.from_string("private.example.com/image")
credentials_provider = lambda domain: UsernamePassword("user", "pass")
auth_server = TrivialAuthServer("auth.example.com", token="token")
registry_server = InMemoryOCIRegistryWithAuth(
image.domain, token="token", realm="https://auth.example.com/login"
)
urlopen = create_opener(
registry_server, auth_server, credentials_provider=credentials_provider
).open
# First request, should work with token "token"
assert urlopen(image.endpoint()).status == 200
# Invalidate the token on the registry
registry_server.token = "new_token"
auth_server.token = "new_token"
# Second request: reusing the cached token should fail
# but in the background we will get a new token from the auth server
assert urlopen(image.endpoint()).status == 200
# Subsequent requests should work with the same token, let's do two more
assert urlopen(image.endpoint()).status == 200
assert urlopen(image.endpoint()).status == 200
# And finally, we should see that we've issues exactly two requests to the auth server
assert auth_server.requests == [("GET", "/login"), ("GET", "/login")]
# Whereas we've done more requests to the registry
assert registry_server.requests == [
("GET", "/v2/"), # 1: without bearer token
("GET", "/v2/"), # 2: retry with bearer token
("GET", "/v2/"), # 3: with incorrect bearer token
("GET", "/v2/"), # 4: retry with new bearer token
("GET", "/v2/"), # 5: with recyled correct bearer token
("GET", "/v2/"), # 6: with recyled correct bearer token
]
class InMemoryRegistryWithUnsupportedAuth(InMemoryOCIRegistry):
"""A registry that does set a WWW-Authenticate header, but
with a challenge we don't support."""
def __init__(self, domain: str, allow_single_post: bool = True, www_authenticate=None) -> None:
self.www_authenticate = www_authenticate
super().__init__(domain, allow_single_post)
self.router.add_middleware(self.unsupported_auth_method)
def unsupported_auth_method(self, req: Request):
headers = {}
if self.www_authenticate:
headers["WWW-Authenticate"] = self.www_authenticate
raise MiddlewareError(MockHTTPResponse(401, "Unauthorized", headers=headers))
@pytest.mark.parametrize(
"www_authenticate,error_message",
[
# missing service and scope
('Bearer realm="https://auth.example.com/login"', "unsupported authentication scheme"),
# we don't do basic auth
('Basic realm="https://auth.example.com/login"', "unsupported authentication scheme"),
# multiple unsupported challenges
(
"CustomChallenge method=unsupported, OtherChallenge method=x,param=y",
"unsupported authentication scheme",
),
# no challenge
(None, "missing WWW-Authenticate header"),
# malformed challenge, missing quotes
("Bearer realm=https://auth.example.com", "malformed WWW-Authenticate header"),
# http instead of https
('Bearer realm="http://auth.example.com",scope=x,service=y', "insecure http connection"),
],
)
def test_auth_method_we_cannot_handle_is_error(www_authenticate, error_message):
# We can only handle WWW-Authenticate with a Bearer challenge
image = ImageReference.from_string("private.example.com/image")
urlopen = create_opener(
InMemoryRegistryWithUnsupportedAuth(image.domain, www_authenticate=www_authenticate),
TrivialAuthServer("auth.example.com", token="token"),
credentials_provider=lambda domain: UsernamePassword("user", "pass"),
).open
with pytest.raises(urllib.error.HTTPError, match=error_message) as e:
urlopen(image.endpoint())
assert e.value.getcode() == 401
# Parametrize over single POST vs POST + PUT.
@pytest.mark.parametrize("client_single_request", [True, False])
@pytest.mark.parametrize("server_single_request", [True, False])
def test_oci_registry_upload(tmpdir, client_single_request, server_single_request):
opener = urllib.request.OpenerDirector()
opener.add_handler(
DummyServerUrllibHandler().add_server(
"example.com", InMemoryOCIRegistry(server_single_request)
)
)
opener.add_handler(urllib.request.HTTPDefaultErrorHandler())
opener.add_handler(urllib.request.HTTPErrorProcessor())
# Create a small blob
blob = tmpdir.join("blob")
blob.write("Hello world!")
image = ImageReference.from_string("example.com/image:latest")
digest = Digest.from_sha256(hashlib.sha256(blob.read_binary()).hexdigest())
# Set small file size larger than the blob iff we're doing single request
small_file_size = 1024 if client_single_request else 0
# Upload once, should actually upload
assert upload_blob(
ref=image,
file=blob.strpath,
digest=digest,
small_file_size=small_file_size,
_urlopen=opener.open,
)
# Second time should exit as it exists
assert not upload_blob(
ref=image,
file=blob.strpath,
digest=digest,
small_file_size=small_file_size,
_urlopen=opener.open,
)
# Force upload should upload again
assert upload_blob(
ref=image,
file=blob.strpath,
digest=digest,
force=True,
small_file_size=small_file_size,
_urlopen=opener.open,
)
def test_copy_missing_layers(tmpdir, config):
"""Test copying layers from one registry to another.
Creates 3 blobs, 1 config and 1 manifest in registry A
and copies layers to registry B. Then checks that all
layers are present in registry B. Finally it runs the copy
again and checks that no new layers are uploaded."""
# NOTE: config fixture is used to disable default source mirrors
# which are used in Stage(...). Otherwise this test doesn't really
# rely on globals.
src = ImageReference.from_string("a.example.com/image:x")
dst = ImageReference.from_string("b.example.com/image:y")
src_registry = InMemoryOCIRegistry(src.domain)
dst_registry = InMemoryOCIRegistry(dst.domain)
urlopen = create_opener(src_registry, dst_registry).open
# TODO: make it a bit easier to create bunch of blobs + config + manifest?
# Create a few blobs and a config file
blobs = [tmpdir.join(f"blob{i}") for i in range(3)]
for i, blob in enumerate(blobs):
blob.write(f"Blob {i}")
digests = [
Digest.from_sha256(hashlib.sha256(blob.read_binary()).hexdigest()) for blob in blobs
]
config = default_config(architecture="amd64", os="linux")
configfile = tmpdir.join("config.json")
configfile.write(json.dumps(config))
config_digest = Digest.from_sha256(hashlib.sha256(configfile.read_binary()).hexdigest())
for blob, digest in zip(blobs, digests):
upload_blob(src, blob.strpath, digest, _urlopen=urlopen)
upload_blob(src, configfile.strpath, config_digest, _urlopen=urlopen)
# Then create a manifest referencing them
manifest = default_manifest()
for blob, digest in zip(blobs, digests):
manifest["layers"].append(
{
"mediaType": "application/vnd.oci.image.layer.v1.tar+gzip",
"digest": str(digest),
"size": blob.size(),
}
)
manifest["config"] = {
"mediaType": "application/vnd.oci.image.config.v1+json",
"digest": str(config_digest),
"size": configfile.size(),
}
upload_manifest(src, manifest, _urlopen=urlopen)
# Finally, copy the image from src to dst
copy_missing_layers(src, dst, architecture="amd64", _urlopen=urlopen)
# Check that all layers (not config) were copied and identical
assert len(dst_registry.blobs) == len(blobs)
for blob, digest in zip(blobs, digests):
assert dst_registry.blobs.get(str(digest)) == blob.read_binary()
is_upload = lambda method, path: method == "POST" and path == "/v2/image/blobs/uploads/"
is_exists = lambda method, path: method == "HEAD" and path.startswith("/v2/image/blobs/")
# Check that exactly 3 uploads were initiated, and that we don't do
# double existence checks when uploading.
assert sum(is_upload(method, path) for method, path in dst_registry.requests) == 3
assert sum(is_exists(method, path) for method, path in dst_registry.requests) == 3
# Check that re-uploading skips existing layers.
dst_registry.clear_log()
copy_missing_layers(src, dst, architecture="amd64", _urlopen=urlopen)
# Check that no uploads were initiated, only existence checks were done.
assert sum(is_upload(method, path) for method, path in dst_registry.requests) == 0
assert sum(is_exists(method, path) for method, path in dst_registry.requests) == 3
def test_image_from_mirror():
mirror = spack.mirror.Mirror("oci://example.com/image")
assert image_from_mirror(mirror) == ImageReference.from_string("example.com/image")
def test_image_reference_str():
"""Test that with_digest() works with Digest and str."""
digest_str = f"sha256:{1234:064x}"
digest = Digest.from_string(digest_str)
img = ImageReference.from_string("example.com/image")
assert str(img.with_digest(digest)) == f"example.com/image:latest@{digest}"
assert str(img.with_digest(digest_str)) == f"example.com/image:latest@{digest}"
assert str(img.with_tag("hello")) == "example.com/image:hello"
assert str(img.with_tag("hello").with_digest(digest)) == f"example.com/image:hello@{digest}"
@pytest.mark.parametrize(
"image",
[
# white space issue
" example.com/image",
# not alpha-numeric
"hello#world:latest",
],
)
def test_image_reference_invalid(image):
with pytest.raises(ValueError, match="Invalid image reference"):
ImageReference.from_string(image)
def test_default_credentials_provider():
"""The default credentials provider uses a collection of configured
mirrors."""
mirrors = [
# OCI mirror with push credentials
spack.mirror.Mirror(
{"url": "oci://a.example.com/image", "push": {"access_pair": ["user.a", "pass.a"]}}
),
# Not an OCI mirror
spack.mirror.Mirror(
{"url": "https://b.example.com/image", "access_pair": ["user.b", "pass.b"]}
),
# No credentials
spack.mirror.Mirror("oci://c.example.com/image"),
# Top-level credentials
spack.mirror.Mirror(
{"url": "oci://d.example.com/image", "access_pair": ["user.d", "pass.d"]}
),
# Dockerhub short reference
spack.mirror.Mirror(
{"url": "oci://user/image", "access_pair": ["dockerhub_user", "dockerhub_pass"]}
),
# Localhost (not a dockerhub short reference)
spack.mirror.Mirror(
{"url": "oci://localhost/image", "access_pair": ["user.localhost", "pass.localhost"]}
),
]
assert credentials_from_mirrors("a.example.com", mirrors=mirrors) == UsernamePassword(
"user.a", "pass.a"
)
assert credentials_from_mirrors("b.example.com", mirrors=mirrors) is None
assert credentials_from_mirrors("c.example.com", mirrors=mirrors) is None
assert credentials_from_mirrors("d.example.com", mirrors=mirrors) == UsernamePassword(
"user.d", "pass.d"
)
assert credentials_from_mirrors("index.docker.io", mirrors=mirrors) == UsernamePassword(
"dockerhub_user", "dockerhub_pass"
)
assert credentials_from_mirrors("localhost", mirrors=mirrors) == UsernamePassword(
"user.localhost", "pass.localhost"
)
def test_manifest_index(tmpdir):
"""Test obtaining manifest + config from a registry
that has an index"""
urlopen = create_opener(InMemoryOCIRegistry("registry.example.com")).open
img = ImageReference.from_string("registry.example.com/image")
# Create two config files and manifests, for different architectures
manifest_descriptors = []
manifest_and_config = {}
for arch in ("amd64", "arm64"):
file = tmpdir.join(f"config_{arch}.json")
config = default_config(architecture=arch, os="linux")
file.write(json.dumps(config))
config_digest = Digest.from_sha256(hashlib.sha256(file.read_binary()).hexdigest())
assert upload_blob(img, file, config_digest, _urlopen=urlopen)
manifest = {
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"config": {
"mediaType": "application/vnd.oci.image.config.v1+json",
"digest": str(config_digest),
"size": file.size(),
},
"layers": [],
}
manifest_digest, manifest_size = upload_manifest(
img, manifest, tag=False, _urlopen=urlopen
)
manifest_descriptors.append(
{
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"platform": {"architecture": arch, "os": "linux"},
"digest": str(manifest_digest),
"size": manifest_size,
}
)
manifest_and_config[arch] = (manifest, config)
# And a single index.
index = {
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.index.v1+json",
"manifests": manifest_descriptors,
}
upload_manifest(img, index, tag=True, _urlopen=urlopen)
# Check that we fetcht the correct manifest and config for each architecture
for arch in ("amd64", "arm64"):
assert (
get_manifest_and_config(img, architecture=arch, _urlopen=urlopen)
== manifest_and_config[arch]
)
# Also test max recursion
with pytest.raises(Exception, match="Maximum recursion depth reached"):
get_manifest_and_config(img, architecture="amd64", recurse=0, _urlopen=urlopen)
class BrokenServer(DummyServer):
"""Dummy server that returns 500 and 429 errors twice before succeeding"""
def __init__(self, domain: str) -> None:
super().__init__(domain)
self.router.register("GET", r"/internal-server-error/", self.internal_server_error_twice)
self.router.register("GET", r"/rate-limit/", self.rate_limit_twice)
self.router.register("GET", r"/not-found/", self.not_found)
self.count_500 = 0
self.count_429 = 0
def internal_server_error_twice(self, request: Request):
self.count_500 += 1
if self.count_500 < 3:
return MockHTTPResponse(500, "Internal Server Error")
else:
return MockHTTPResponse(200, "OK")
def rate_limit_twice(self, request: Request):
self.count_429 += 1
if self.count_429 < 3:
return MockHTTPResponse(429, "Rate Limit Exceeded")
else:
return MockHTTPResponse(200, "OK")
def not_found(self, request: Request):
return MockHTTPResponse(404, "Not Found")
@pytest.mark.parametrize(
"url,max_retries,expect_failure,expect_requests",
[
# 500s should be retried
("https://example.com/internal-server-error/", 2, True, 2),
("https://example.com/internal-server-error/", 5, False, 3),
# 429s should be retried
("https://example.com/rate-limit/", 2, True, 2),
("https://example.com/rate-limit/", 5, False, 3),
# 404s shouldn't be retried
("https://example.com/not-found/", 3, True, 1),
],
)
def test_retry(url, max_retries, expect_failure, expect_requests):
server = BrokenServer("example.com")
urlopen = create_opener(server).open
sleep_time = []
dont_sleep = lambda t: sleep_time.append(t) # keep track of sleep times
try:
response = default_retry(urlopen, retries=max_retries, sleep=dont_sleep)(url)
except urllib.error.HTTPError as e:
if not expect_failure:
assert False, f"Unexpected HTTPError: {e}"
else:
if expect_failure:
assert False, "Expected HTTPError, but none was raised"
assert response.status == 200
assert len(server.requests) == expect_requests
assert sleep_time == [2**i for i in range(expect_requests - 1)]

View File

@@ -4,10 +4,12 @@
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import hashlib
from typing import Any, Callable, Dict # novm
from typing import BinaryIO, Callable, Dict, Optional
import llnl.util.tty as tty
HashFactory = Callable[[], "hashlib._Hash"]
#: Set of hash algorithms that Spack can use, mapped to digest size in bytes
hashes = {"sha256": 32, "md5": 16, "sha1": 20, "sha224": 28, "sha384": 48, "sha512": 64}
# Note: keys are ordered by popularity for earliest return in ``hash_key in version_dict`` checks.
@@ -23,7 +25,7 @@
#: cache of hash functions generated
_hash_functions: Dict[str, Callable[[], Any]] = {}
_hash_functions: Dict[str, HashFactory] = {}
class DeprecatedHash:
@@ -44,55 +46,57 @@ def __call__(self, disable_alert=False):
return hashlib.new(self.hash_alg)
def hash_fun_for_algo(algo):
def hash_fun_for_algo(algo: str) -> HashFactory:
"""Get a function that can perform the specified hash algorithm."""
hash_gen = _hash_functions.get(algo)
if hash_gen is None:
if algo in _deprecated_hash_algorithms:
try:
hash_gen = DeprecatedHash(algo, tty.debug, disable_security_check=False)
fun = _hash_functions.get(algo)
if fun:
return fun
elif algo not in _deprecated_hash_algorithms:
_hash_functions[algo] = getattr(hashlib, algo)
else:
try:
deprecated_fun = DeprecatedHash(algo, tty.debug, disable_security_check=False)
# call once to get a ValueError if usedforsecurity is needed
hash_gen(disable_alert=True)
except ValueError:
# Some systems may support the 'usedforsecurity' option
# so try with that (but display a warning when it is used)
hash_gen = DeprecatedHash(algo, tty.warn, disable_security_check=True)
else:
hash_gen = getattr(hashlib, algo)
_hash_functions[algo] = hash_gen
return hash_gen
# call once to get a ValueError if usedforsecurity is needed
deprecated_fun(disable_alert=True)
except ValueError:
# Some systems may support the 'usedforsecurity' option
# so try with that (but display a warning when it is used)
deprecated_fun = DeprecatedHash(algo, tty.warn, disable_security_check=True)
_hash_functions[algo] = deprecated_fun
return _hash_functions[algo]
def hash_algo_for_digest(hexdigest):
def hash_algo_for_digest(hexdigest: str) -> str:
"""Gets name of the hash algorithm for a hex digest."""
bytes = len(hexdigest) / 2
if bytes not in _size_to_hash:
raise ValueError("Spack knows no hash algorithm for this digest: %s" % hexdigest)
return _size_to_hash[bytes]
algo = _size_to_hash.get(len(hexdigest) // 2)
if algo is None:
raise ValueError(f"Spack knows no hash algorithm for this digest: {hexdigest}")
return algo
def hash_fun_for_digest(hexdigest):
def hash_fun_for_digest(hexdigest: str) -> HashFactory:
"""Gets a hash function corresponding to a hex digest."""
return hash_fun_for_algo(hash_algo_for_digest(hexdigest))
def checksum(hashlib_algo, filename, **kwargs):
"""Returns a hex digest of the filename generated using an
algorithm from hashlib.
"""
block_size = kwargs.get("block_size", 2**20)
def checksum_stream(hashlib_algo: HashFactory, fp: BinaryIO, *, block_size: int = 2**20) -> str:
"""Returns a hex digest of the stream generated using given algorithm from hashlib."""
hasher = hashlib_algo()
with open(filename, "rb") as file:
while True:
data = file.read(block_size)
if not data:
break
hasher.update(data)
while True:
data = fp.read(block_size)
if not data:
break
hasher.update(data)
return hasher.hexdigest()
def checksum(hashlib_algo: HashFactory, filename: str, *, block_size: int = 2**20) -> str:
"""Returns a hex digest of the filename generated using an algorithm from hashlib."""
with open(filename, "rb") as f:
return checksum_stream(hashlib_algo, f, block_size=block_size)
class Checker:
"""A checker checks files against one particular hex digest.
It will automatically determine what hashing algorithm
@@ -115,18 +119,18 @@ class Checker:
a 1MB (2**20 bytes) buffer.
"""
def __init__(self, hexdigest, **kwargs):
def __init__(self, hexdigest: str, **kwargs) -> None:
self.block_size = kwargs.get("block_size", 2**20)
self.hexdigest = hexdigest
self.sum = None
self.sum: Optional[str] = None
self.hash_fun = hash_fun_for_digest(hexdigest)
@property
def hash_name(self):
def hash_name(self) -> str:
"""Get the name of the hash function this Checker is using."""
return self.hash_fun().name.lower()
def check(self, filename):
def check(self, filename: str) -> bool:
"""Read the file with the specified name and check its checksum
against self.hexdigest. Return True if they match, False
otherwise. Actual checksum is stored in self.sum.