Circular import fix: spack.oci.opener -> spack.parser (#47956)

by splitting spack.parser into two modules
This commit is contained in:
Tamara Dahlgren 2024-12-12 01:02:07 -08:00 committed by GitHub
parent 9265991767
commit 9aefbb0e96
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 213 additions and 209 deletions

View File

@ -69,10 +69,8 @@
Digest,
ImageReference,
default_config,
default_index_tag,
default_manifest,
default_tag,
tag_is_spec,
ensure_valid_tag,
)
from spack.oci.oci import (
copy_missing_layers_with_retry,
@ -83,7 +81,6 @@
)
from spack.package_prefs import get_package_dir_permissions, get_package_group
from spack.relocate_text import utf8_paths_to_single_binary_regex
from spack.spec import Spec
from spack.stage import Stage
from spack.util.executable import which
@ -827,10 +824,10 @@ def _read_specs_and_push_index(
contents = read_method(file)
# Need full spec.json name or this gets confused with index.json.
if file.endswith(".json.sig"):
specfile_json = Spec.extract_json_from_clearsig(contents)
fetched_spec = Spec.from_dict(specfile_json)
specfile_json = spack.spec.Spec.extract_json_from_clearsig(contents)
fetched_spec = spack.spec.Spec.from_dict(specfile_json)
elif file.endswith(".json"):
fetched_spec = Spec.from_json(contents)
fetched_spec = spack.spec.Spec.from_json(contents)
else:
continue
@ -1100,7 +1097,7 @@ class ExistsInBuildcache(NamedTuple):
class BuildcacheFiles:
def __init__(self, spec: Spec, local: str, remote: str):
def __init__(self, spec: spack.spec.Spec, local: str, remote: str):
"""
Args:
spec: The spec whose tarball and specfile are being managed.
@ -1130,7 +1127,7 @@ def local_tarball(self) -> str:
return os.path.join(self.local, f"{self.spec.dag_hash()}.tar.gz")
def _exists_in_buildcache(spec: Spec, tmpdir: str, out_url: str) -> ExistsInBuildcache:
def _exists_in_buildcache(spec: spack.spec.Spec, tmpdir: str, out_url: str) -> ExistsInBuildcache:
"""returns a tuple of bools (signed, unsigned, tarball) indicating whether specfiles/tarballs
exist in the buildcache"""
files = BuildcacheFiles(spec, tmpdir, out_url)
@ -1141,7 +1138,11 @@ def _exists_in_buildcache(spec: Spec, tmpdir: str, out_url: str) -> ExistsInBuil
def _url_upload_tarball_and_specfile(
spec: Spec, tmpdir: str, out_url: str, exists: ExistsInBuildcache, signing_key: Optional[str]
spec: spack.spec.Spec,
tmpdir: str,
out_url: str,
exists: ExistsInBuildcache,
signing_key: Optional[str],
):
files = BuildcacheFiles(spec, tmpdir, out_url)
tarball = files.local_tarball()
@ -1314,7 +1315,7 @@ def make_uploader(
)
def _format_spec(spec: Spec) -> str:
def _format_spec(spec: spack.spec.Spec) -> str:
return spec.cformat("{name}{@version}{/hash:7}")
@ -1337,7 +1338,7 @@ def _progress(self):
return f"[{self.n:{digits}}/{self.total}] "
return ""
def start(self, spec: Spec, running: bool) -> None:
def start(self, spec: spack.spec.Spec, running: bool) -> None:
self.n += 1
self.running = running
self.pre = self._progress()
@ -1356,18 +1357,18 @@ def fail(self) -> None:
def _url_push(
specs: List[Spec],
specs: List[spack.spec.Spec],
out_url: str,
signing_key: Optional[str],
force: bool,
update_index: bool,
tmpdir: str,
executor: concurrent.futures.Executor,
) -> Tuple[List[Spec], List[Tuple[Spec, BaseException]]]:
) -> Tuple[List[spack.spec.Spec], List[Tuple[spack.spec.Spec, BaseException]]]:
"""Pushes to the provided build cache, and returns a list of skipped specs that were already
present (when force=False), and a list of errors. Does not raise on error."""
skipped: List[Spec] = []
errors: List[Tuple[Spec, BaseException]] = []
skipped: List[spack.spec.Spec] = []
errors: List[Tuple[spack.spec.Spec, BaseException]] = []
exists_futures = [
executor.submit(_exists_in_buildcache, spec, tmpdir, out_url) for spec in specs
@ -1440,7 +1441,7 @@ def _url_push(
return skipped, errors
def _oci_upload_success_msg(spec: Spec, digest: Digest, size: int, elapsed: float):
def _oci_upload_success_msg(spec: spack.spec.Spec, digest: Digest, size: int, elapsed: float):
elapsed = max(elapsed, 0.001) # guard against division by zero
return (
f"Pushed {_format_spec(spec)}: {digest} ({elapsed:.2f}s, "
@ -1526,7 +1527,7 @@ def _oci_put_manifest(
):
architecture = _oci_archspec_to_gooarch(specs[0])
expected_blobs: List[Spec] = [
expected_blobs: List[spack.spec.Spec] = [
s
for s in traverse.traverse_nodes(specs, order="topo", deptype=("link", "run"), root=True)
if not s.external
@ -1640,19 +1641,33 @@ def _oci_update_base_images(
)
def _oci_default_tag(spec: spack.spec.Spec) -> str:
"""Return a valid, default image tag for a spec."""
return ensure_valid_tag(f"{spec.name}-{spec.version}-{spec.dag_hash()}.spack")
#: Default OCI index tag
default_index_tag = "index.spack"
def tag_is_spec(tag: str) -> bool:
"""Check if a tag is likely a Spec"""
return tag.endswith(".spack") and tag != default_index_tag
def _oci_push(
*,
target_image: ImageReference,
base_image: Optional[ImageReference],
installed_specs_with_deps: List[Spec],
installed_specs_with_deps: List[spack.spec.Spec],
tmpdir: str,
executor: concurrent.futures.Executor,
force: bool = False,
) -> Tuple[
List[Spec],
List[spack.spec.Spec],
Dict[str, Tuple[dict, dict]],
Dict[str, spack.oci.oci.Blob],
List[Tuple[Spec, BaseException]],
List[Tuple[spack.spec.Spec, BaseException]],
]:
# Spec dag hash -> blob
checksums: Dict[str, spack.oci.oci.Blob] = {}
@ -1661,13 +1676,15 @@ def _oci_push(
base_images: Dict[str, Tuple[dict, dict]] = {}
# Specs not uploaded because they already exist
skipped: List[Spec] = []
skipped: List[spack.spec.Spec] = []
if not force:
tty.info("Checking for existing specs in the buildcache")
blobs_to_upload = []
tags_to_check = (target_image.with_tag(default_tag(s)) for s in installed_specs_with_deps)
tags_to_check = (
target_image.with_tag(_oci_default_tag(s)) for s in installed_specs_with_deps
)
available_blobs = executor.map(_oci_get_blob_info, tags_to_check)
for spec, maybe_blob in zip(installed_specs_with_deps, available_blobs):
@ -1695,8 +1712,8 @@ def _oci_push(
executor.submit(_oci_push_pkg_blob, target_image, spec, tmpdir) for spec in blobs_to_upload
]
manifests_to_upload: List[Spec] = []
errors: List[Tuple[Spec, BaseException]] = []
manifests_to_upload: List[spack.spec.Spec] = []
errors: List[Tuple[spack.spec.Spec, BaseException]] = []
# And update the spec to blob mapping for successful uploads
for spec, blob_future in zip(blobs_to_upload, blob_futures):
@ -1722,7 +1739,7 @@ def _oci_push(
base_image_cache=base_images,
)
def extra_config(spec: Spec):
def extra_config(spec: spack.spec.Spec):
spec_dict = spec.to_dict(hash=ht.dag_hash)
spec_dict["buildcache_layout_version"] = CURRENT_BUILD_CACHE_LAYOUT_VERSION
spec_dict["binary_cache_checksum"] = {
@ -1738,7 +1755,7 @@ def extra_config(spec: Spec):
_oci_put_manifest,
base_images,
checksums,
target_image.with_tag(default_tag(spec)),
target_image.with_tag(_oci_default_tag(spec)),
tmpdir,
extra_config(spec),
{"org.opencontainers.image.description": spec.format()},
@ -1755,7 +1772,7 @@ def extra_config(spec: Spec):
manifest_progress.start(spec, manifest_future.running())
if error is None:
manifest_progress.ok(
f"Tagged {_format_spec(spec)} as {target_image.with_tag(default_tag(spec))}"
f"Tagged {_format_spec(spec)} as {target_image.with_tag(_oci_default_tag(spec))}"
)
else:
manifest_progress.fail()
@ -1790,7 +1807,7 @@ def _oci_update_index(
db = BuildCacheDatabase(db_root_dir)
for spec_dict in spec_dicts:
spec = Spec.from_dict(spec_dict)
spec = spack.spec.Spec.from_dict(spec_dict)
db.add(spec)
db.mark(spec, "in_buildcache", True)
@ -1905,7 +1922,7 @@ def _get_valid_spec_file(path: str, max_supported_layout: int) -> Tuple[Dict, in
try:
as_string = binary_content.decode("utf-8")
if path.endswith(".json.sig"):
spec_dict = Spec.extract_json_from_clearsig(as_string)
spec_dict = spack.spec.Spec.extract_json_from_clearsig(as_string)
else:
spec_dict = json.loads(as_string)
except Exception as e:
@ -2001,7 +2018,7 @@ def fetch_url_to_mirror(url):
if fetch_url.startswith("oci://"):
ref = spack.oci.image.ImageReference.from_string(
fetch_url[len("oci://") :]
).with_tag(spack.oci.image.default_tag(spec))
).with_tag(_oci_default_tag(spec))
# Fetch the manifest
try:
@ -2245,7 +2262,8 @@ def relocate_package(spec):
]
if analogs:
# Prefer same-name analogs and prefer higher versions
# This matches the preferences in Spec.splice, so we will find same node
# This matches the preferences in spack.spec.Spec.splice, so we
# will find same node
analog = max(analogs, key=lambda a: (a.name == s.name, a.version))
lookup_dag_hash = analog.dag_hash()
@ -2681,10 +2699,10 @@ def try_direct_fetch(spec, mirrors=None):
# are concrete (as they are built) so we need to mark this spec
# concrete on read-in.
if specfile_is_signed:
specfile_json = Spec.extract_json_from_clearsig(specfile_contents)
fetched_spec = Spec.from_dict(specfile_json)
specfile_json = spack.spec.Spec.extract_json_from_clearsig(specfile_contents)
fetched_spec = spack.spec.Spec.from_dict(specfile_json)
else:
fetched_spec = Spec.from_json(specfile_contents)
fetched_spec = spack.spec.Spec.from_json(specfile_contents)
fetched_spec._mark_concrete()
found_specs.append({"mirror_url": mirror.fetch_url, "spec": fetched_spec})
@ -2983,7 +3001,7 @@ def __init__(self, all_architectures):
self.possible_specs = specs
def __call__(self, spec: Spec, **kwargs):
def __call__(self, spec: spack.spec.Spec, **kwargs):
"""
Args:
spec: The spec being searched for
@ -3121,7 +3139,7 @@ def __init__(self, url: str, local_hash, urlopen=None) -> None:
def conditional_fetch(self) -> FetchIndexResult:
"""Download an index from an OCI registry type mirror."""
url_manifest = self.ref.with_tag(spack.oci.image.default_index_tag).manifest_url()
url_manifest = self.ref.with_tag(default_index_tag).manifest_url()
try:
response = self.urlopen(
urllib.request.Request(

View File

@ -29,6 +29,7 @@
import spack.repo
import spack.spec
import spack.store
import spack.token
import spack.traverse as traverse
import spack.user_environment as uenv
import spack.util.spack_json as sjson
@ -168,7 +169,7 @@ def quote_kvp(string: str) -> str:
return string
key, delim, value = match.groups()
return f"{key}{delim}{spack.parser.quote_if_needed(value)}"
return f"{key}{delim}{spack.token.quote_if_needed(value)}"
def parse_specs(

View File

@ -7,8 +7,6 @@
import urllib.parse
from typing import Optional, Union
import spack.spec
# notice: Docker is more strict (no uppercase allowed). We parse image names *with* uppercase
# and normalize, so: example.com/Organization/Name -> example.com/organization/name. Tags are
# case sensitive though.
@ -195,7 +193,7 @@ def __eq__(self, __value: object) -> bool:
)
def _ensure_valid_tag(tag: str) -> str:
def ensure_valid_tag(tag: str) -> str:
"""Ensure a tag is valid for an OCI registry."""
sanitized = re.sub(r"[^\w.-]", "_", tag)
if len(sanitized) > 128:
@ -203,20 +201,6 @@ def _ensure_valid_tag(tag: str) -> str:
return sanitized
def default_tag(spec: "spack.spec.Spec") -> str:
"""Return a valid, default image tag for a spec."""
return _ensure_valid_tag(f"{spec.name}-{spec.version}-{spec.dag_hash()}.spack")
#: Default OCI index tag
default_index_tag = "index.spack"
def tag_is_spec(tag: str) -> bool:
"""Check if a tag is likely a Spec"""
return tag.endswith(".spack") and tag != default_index_tag
def default_config(architecture: str, os: str):
return {
"architecture": architecture,

View File

@ -21,7 +21,7 @@
import spack.config
import spack.mirrors.mirror
import spack.parser
import spack.token
import spack.util.web
from .image import ImageReference
@ -57,7 +57,7 @@ def dispatch_open(fullurl, data=None, timeout=None):
quoted_string = rf'"(?:({qdtext}*)|{quoted_pair})*"'
class TokenType(spack.parser.TokenBase):
class TokenType(spack.token.TokenBase):
AUTH_PARAM = rf"({token}){BWS}={BWS}({token}|{quoted_string})"
# TOKEN68 = r"([A-Za-z0-9\-._~+/]+=*)" # todo... support this?
TOKEN = rf"{tchar}+"
@ -85,7 +85,7 @@ def tokenize(input: str):
scanner = ALL_TOKENS.scanner(input) # type: ignore[attr-defined]
for match in iter(scanner.match, None): # type: ignore[var-annotated]
yield spack.parser.Token(
yield spack.token.Token(
TokenType.__members__[match.lastgroup], # type: ignore[attr-defined]
match.group(), # type: ignore[attr-defined]
match.start(), # type: ignore[attr-defined]
@ -141,7 +141,7 @@ def extract_auth_param(input: str) -> Tuple[str, str]:
return key, value
while True:
token: spack.parser.Token = next(tokens)
token: spack.token.Token = next(tokens)
if mode == State.CHALLENGE:
if token.kind == TokenType.EOF:

View File

@ -57,11 +57,8 @@
specs to avoid ambiguity. Both are provided because ~ can cause shell
expansion when it is the first character in an id typed on the command line.
"""
import enum
import json
import pathlib
import re
import sys
from typing import Iterator, List, Match, Optional
from llnl.util.tty import color
@ -70,9 +67,8 @@
import spack.error
import spack.spec
import spack.version
from spack.error import SpecSyntaxError
from spack.token import FILENAME, Token, TokenBase, strip_quotes_and_unescape
IS_WINDOWS = sys.platform == "win32"
#: Valid name for specs and variants. Here we are not using
#: the previous "w[\w.-]*" since that would match most
#: characters that can be part of a word in any language
@ -87,22 +83,9 @@
HASH = r"[a-zA-Z_0-9]+"
#: A filename starts either with a "." or a "/" or a "{name}/,
# or on Windows, a drive letter followed by a colon and "\"
# or "." or {name}\
WINDOWS_FILENAME = r"(?:\.|[a-zA-Z0-9-_]*\\|[a-zA-Z]:\\)(?:[a-zA-Z0-9-_\.\\]*)(?:\.json|\.yaml)"
UNIX_FILENAME = r"(?:\.|\/|[a-zA-Z0-9-_]*\/)(?:[a-zA-Z0-9-_\.\/]*)(?:\.json|\.yaml)"
if not IS_WINDOWS:
FILENAME = UNIX_FILENAME
else:
FILENAME = WINDOWS_FILENAME
#: These are legal values that *can* be parsed bare, without quotes on the command line.
VALUE = r"(?:[a-zA-Z_0-9\-+\*.,:=\~\/\\]+)"
#: Variant/flag values that match this can be left unquoted in Spack output
NO_QUOTES_NEEDED = re.compile(r"^[a-zA-Z0-9,/_.-]+$")
#: Quoted values can be *anything* in between quotes, including escaped quotes.
QUOTED_VALUE = r"(?:'(?:[^']|(?<=\\)')*'|\"(?:[^\"]|(?<=\\)\")*\")"
@ -113,60 +96,9 @@
#: Regex with groups to use for splitting (optionally propagated) key-value pairs
SPLIT_KVP = re.compile(rf"^({NAME})(==?)(.*)$")
#: Regex to strip quotes. Group 2 will be the unquoted string.
STRIP_QUOTES = re.compile(r"^(['\"])(.*)\1$")
def strip_quotes_and_unescape(string: str) -> str:
"""Remove surrounding single or double quotes from string, if present."""
match = STRIP_QUOTES.match(string)
if not match:
return string
# replace any escaped quotes with bare quotes
quote, result = match.groups()
return result.replace(rf"\{quote}", quote)
def quote_if_needed(value: str) -> str:
"""Add quotes around the value if it requires quotes.
This will add quotes around the value unless it matches ``NO_QUOTES_NEEDED``.
This adds:
* single quotes by default
* double quotes around any value that contains single quotes
If double quotes are used, we json-escpae the string. That is, we escape ``\\``,
``"``, and control codes.
"""
if NO_QUOTES_NEEDED.match(value):
return value
return json.dumps(value) if "'" in value else f"'{value}'"
class TokenBase(enum.Enum):
"""Base class for an enum type with a regex value"""
def __new__(cls, *args, **kwargs):
# See
value = len(cls.__members__) + 1
obj = object.__new__(cls)
obj._value_ = value
return obj
def __init__(self, regex):
self.regex = regex
def __str__(self):
return f"{self._name_}"
class TokenType(TokenBase):
"""Enumeration of the different token kinds in the spec grammar.
Order of declaration is extremely important, since text containing specs is parsed with a
single regex obtained by ``"|".join(...)`` of all the regex in the order of declaration.
"""
@ -205,29 +137,6 @@ class ErrorTokenType(TokenBase):
UNEXPECTED = r"(?:.[\s]*)"
class Token:
"""Represents tokens; generated from input by lexer and fed to parse()."""
__slots__ = "kind", "value", "start", "end"
def __init__(
self, kind: TokenBase, value: str, start: Optional[int] = None, end: Optional[int] = None
):
self.kind = kind
self.value = value
self.start = start
self.end = end
def __repr__(self):
return str(self)
def __str__(self):
return f"({self.kind}, {self.value})"
def __eq__(self, other):
return (self.kind == other.kind) and (self.value == other.value)
#: List of all the regexes used to match spec parts, in order of precedence
TOKEN_REGEXES = [rf"(?P<{token}>{token.regex})" for token in TokenType]
#: List of all valid regexes followed by error analysis regexes
@ -299,6 +208,24 @@ def expect(self, *kinds: TokenType):
return self.next_token and self.next_token.kind in kinds
class SpecTokenizationError(spack.error.SpecSyntaxError):
"""Syntax error in a spec string"""
def __init__(self, matches, text):
message = "unexpected tokens in the spec string\n"
message += f"{text}"
underline = "\n"
for match in matches:
if match.lastgroup == str(ErrorTokenType.UNEXPECTED):
underline += f"{'^' * (match.end() - match.start())}"
continue
underline += f"{' ' * (match.end() - match.start())}"
message += color.colorize(f"@*r{{{underline}}}")
super().__init__(message)
class SpecParser:
"""Parse text into specs"""
@ -601,25 +528,7 @@ def parse_one_or_raise(
return result
class SpecTokenizationError(SpecSyntaxError):
"""Syntax error in a spec string"""
def __init__(self, matches, text):
message = "unexpected tokens in the spec string\n"
message += f"{text}"
underline = "\n"
for match in matches:
if match.lastgroup == str(ErrorTokenType.UNEXPECTED):
underline += f"{'^' * (match.end() - match.start())}"
continue
underline += f"{' ' * (match.end() - match.start())}"
message += color.colorize(f"@*r{{{underline}}}")
super().__init__(message)
class SpecParsingError(SpecSyntaxError):
class SpecParsingError(spack.error.SpecSyntaxError):
"""Error when parsing tokens"""
def __init__(self, message, token, text):

View File

@ -84,6 +84,7 @@
import spack.repo
import spack.solver
import spack.store
import spack.token
import spack.traverse as traverse
import spack.util.executable
import spack.util.hash
@ -950,11 +951,11 @@ def __str__(self):
for flag_type, flags in sorted_items:
normal = [f for f in flags if not f.propagate]
if normal:
result += f" {flag_type}={spack.parser.quote_if_needed(' '.join(normal))}"
result += f" {flag_type}={spack.token.quote_if_needed(' '.join(normal))}"
propagated = [f for f in flags if f.propagate]
if propagated:
result += f" {flag_type}=={spack.parser.quote_if_needed(' '.join(propagated))}"
result += f" {flag_type}=={spack.token.quote_if_needed(' '.join(propagated))}"
# TODO: somehow add this space only if something follows in Spec.format()
if sorted_items:

View File

@ -10,6 +10,7 @@
import os
import pathlib
import platform
import re
import shutil
import sys
import tarfile
@ -33,6 +34,7 @@
import spack.hooks.sbang as sbang
import spack.main
import spack.mirrors.mirror
import spack.oci.image
import spack.paths
import spack.spec
import spack.stage
@ -1213,3 +1215,19 @@ def test_download_tarball_with_unsupported_layout_fails(tmp_path, mutable_config
# And there should be a warning about an unsupported layout version.
assert f"Layout version {layout_version} is too new" in capsys.readouterr().err
@pytest.mark.parametrize(
"spec",
[
# Standard case
"short-name@=1.2.3",
# Unsupported characters in git version
f"git-version@{1:040x}=develop",
# Too long of a name
f"{'too-long':x<256}@=1.2.3",
],
)
def test_default_tag(spec: str):
"""Make sure that computed image tags are valid."""
assert re.fullmatch(spack.oci.image.tag, bindist._oci_default_tag(spack.spec.Spec(spec)))

View File

@ -3,12 +3,9 @@
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import re
import pytest
import spack.spec
from spack.oci.image import Digest, ImageReference, default_tag, tag
from spack.oci.image import Digest, ImageReference
@pytest.mark.parametrize(
@ -87,19 +84,3 @@ def test_digest():
# Missing algorithm
with pytest.raises(ValueError):
Digest.from_string(valid_digest)
@pytest.mark.parametrize(
"spec",
[
# Standard case
"short-name@=1.2.3",
# Unsupported characters in git version
f"git-version@{1:040x}=develop",
# Too long of a name
f"{'too-long':x<256}@=1.2.3",
],
)
def test_default_tag(spec: str):
"""Make sure that computed image tags are valid."""
assert re.fullmatch(tag, default_tag(spack.spec.Spec(spec)))

View File

@ -23,7 +23,7 @@
import spack.oci.opener
import spack.spec
from spack.main import SpackCommand
from spack.oci.image import Digest, ImageReference, default_config, default_manifest, default_tag
from spack.oci.image import Digest, ImageReference, default_config, default_manifest
from spack.oci.oci import blob_exists, get_manifest_and_config, upload_blob, upload_manifest
from spack.test.oci.mock_registry import DummyServer, InMemoryOCIRegistry, create_opener
from spack.util.archive import gzip_compressed_tarfile
@ -336,7 +336,7 @@ def put_manifest(base_images, checksums, image_ref, tmpdir, extra_config, annota
# Verify that manifests of mpich/libdwarf are missing due to upload failure.
for name in without_manifest:
tagged_img = image.with_tag(default_tag(mpileaks[name]))
tagged_img = image.with_tag(spack.binary_distribution._oci_default_tag(mpileaks[name]))
with pytest.raises(urllib.error.HTTPError, match="404"):
get_manifest_and_config(tagged_img)
@ -352,7 +352,9 @@ def put_manifest(base_images, checksums, image_ref, tmpdir, extra_config, annota
continue
# This should not raise a 404.
manifest, _ = get_manifest_and_config(image.with_tag(default_tag(s)))
manifest, _ = get_manifest_and_config(
image.with_tag(spack.binary_distribution._oci_default_tag(s))
)
# Collect layer digests
pkg_to_all_digests[s.name] = {layer["digest"] for layer in manifest["layers"]}

View File

@ -11,18 +11,11 @@
import spack.binary_distribution
import spack.cmd
import spack.parser
import spack.platforms.test
import spack.repo
import spack.spec
from spack.parser import (
UNIX_FILENAME,
WINDOWS_FILENAME,
SpecParser,
SpecTokenizationError,
Token,
TokenType,
)
from spack.parser import SpecParser, SpecParsingError, SpecTokenizationError, TokenType
from spack.token import UNIX_FILENAME, WINDOWS_FILENAME, Token
FAIL_ON_WINDOWS = pytest.mark.xfail(
sys.platform == "win32",
@ -1022,7 +1015,7 @@ def test_disambiguate_hash_by_spec(spec1, spec2, constraint, mock_packages, monk
],
)
def test_error_conditions(text, match_string):
with pytest.raises(spack.parser.SpecParsingError, match=match_string):
with pytest.raises(SpecParsingError, match=match_string):
SpecParser(text).next_spec()

97
lib/spack/spack/token.py Normal file
View File

@ -0,0 +1,97 @@
# Copyright 2013-2024 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
"""Generic token support."""
import enum
import json
import re
import sys
from typing import Optional
IS_WINDOWS = sys.platform == "win32"
#: A filename starts either with a "." or a "/" or a "{name}/,
# or on Windows, a drive letter followed by a colon and "\"
# or "." or {name}\
WINDOWS_FILENAME = r"(?:\.|[a-zA-Z0-9-_]*\\|[a-zA-Z]:\\)(?:[a-zA-Z0-9-_\.\\]*)(?:\.json|\.yaml)"
UNIX_FILENAME = r"(?:\.|\/|[a-zA-Z0-9-_]*\/)(?:[a-zA-Z0-9-_\.\/]*)(?:\.json|\.yaml)"
if not IS_WINDOWS:
FILENAME = UNIX_FILENAME
else:
FILENAME = WINDOWS_FILENAME
#: Values that match this (e.g., variants, flags) can be left unquoted in Spack output
NO_QUOTES_NEEDED = re.compile(r"^[a-zA-Z0-9,/_.-]+$")
#: Regex to strip quotes. Group 2 will be the unquoted string.
STRIP_QUOTES = re.compile(r"^(['\"])(.*)\1$")
def strip_quotes_and_unescape(string: str) -> str:
"""Remove surrounding single or double quotes from string, if present."""
match = STRIP_QUOTES.match(string)
if not match:
return string
# replace any escaped quotes with bare quotes
quote, result = match.groups()
return result.replace(rf"\{quote}", quote)
def quote_if_needed(value: str) -> str:
"""Add quotes around the value if it requires quotes.
This will add quotes around the value unless it matches ``NO_QUOTES_NEEDED``.
This adds:
* single quotes by default
* double quotes around any value that contains single quotes
If double quotes are used, we json-escape the string. That is, we escape ``\\``,
``"``, and control codes.
"""
if NO_QUOTES_NEEDED.match(value):
return value
return json.dumps(value) if "'" in value else f"'{value}'"
class TokenBase(enum.Enum):
"""Base class for an enum type with a regex value"""
def __new__(cls, *args, **kwargs):
value = len(cls.__members__) + 1
obj = object.__new__(cls)
obj._value_ = value
return obj
def __init__(self, regex):
self.regex = regex
def __str__(self):
return f"{self._name_}"
class Token:
"""Represents tokens; generated from input by lexer and fed to parse()."""
__slots__ = "kind", "value", "start", "end"
def __init__(
self, kind: TokenBase, value: str, start: Optional[int] = None, end: Optional[int] = None
):
self.kind = kind
self.value = value
self.start = start
self.end = end
def __repr__(self):
return str(self)
def __str__(self):
return f"({self.kind}, {self.value})"
def __eq__(self, other):
return (self.kind == other.kind) and (self.value == other.value)

View File

@ -18,8 +18,8 @@
import llnl.util.tty.color
import spack.error as error
import spack.parser
import spack.spec
import spack.token
#: These are variant names used by Spack internally; packages can't use them
reserved_names = [
@ -465,7 +465,7 @@ def __repr__(self) -> str:
def __str__(self) -> str:
delim = "==" if self.propagate else "="
values = spack.parser.quote_if_needed(",".join(str(v) for v in self.value_as_tuple))
values = spack.token.quote_if_needed(",".join(str(v) for v in self.value_as_tuple))
return f"{self.name}{delim}{values}"
@ -514,7 +514,7 @@ def __str__(self) -> str:
values_str = ",".join(str(x) for x in self.value_as_tuple)
delim = "==" if self.propagate else "="
return f"{self.name}{delim}{spack.parser.quote_if_needed(values_str)}"
return f"{self.name}{delim}{spack.token.quote_if_needed(values_str)}"
class SingleValuedVariant(AbstractVariant):
@ -571,7 +571,7 @@ def yaml_entry(self) -> Tuple[str, SerializedValueType]:
def __str__(self) -> str:
delim = "==" if self.propagate else "="
return f"{self.name}{delim}{spack.parser.quote_if_needed(str(self.value))}"
return f"{self.name}{delim}{spack.token.quote_if_needed(str(self.value))}"
class BoolValuedVariant(SingleValuedVariant):