spack.config: cleanup and add type hints (#41741)

This commit is contained in:
Massimiliano Culpo 2023-12-18 17:05:36 +01:00 committed by GitHub
parent 7550a41660
commit af96fef1da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 177 additions and 164 deletions

View File

@ -64,20 +64,14 @@ def setup_parser(subparser):
# List # List
list_parser = sp.add_parser("list", help="list available compilers") list_parser = sp.add_parser("list", help="list available compilers")
list_parser.add_argument( list_parser.add_argument(
"--scope", "--scope", action=arguments.ConfigScope, help="configuration scope to read from"
action=arguments.ConfigScope,
default=lambda: spack.config.default_list_scope(),
help="configuration scope to read from",
) )
# Info # Info
info_parser = sp.add_parser("info", help="show compiler paths") info_parser = sp.add_parser("info", help="show compiler paths")
info_parser.add_argument("compiler_spec") info_parser.add_argument("compiler_spec")
info_parser.add_argument( info_parser.add_argument(
"--scope", "--scope", action=arguments.ConfigScope, help="configuration scope to read from"
action=arguments.ConfigScope,
default=lambda: spack.config.default_list_scope(),
help="configuration scope to read from",
) )

View File

@ -202,10 +202,7 @@ def setup_parser(subparser):
# List # List
list_parser = sp.add_parser("list", help=mirror_list.__doc__) list_parser = sp.add_parser("list", help=mirror_list.__doc__)
list_parser.add_argument( list_parser.add_argument(
"--scope", "--scope", action=arguments.ConfigScope, help="configuration scope to read from"
action=arguments.ConfigScope,
default=lambda: spack.config.default_list_scope(),
help="configuration scope to read from",
) )

View File

@ -42,10 +42,7 @@ def setup_parser(subparser):
# List # List
list_parser = sp.add_parser("list", help=repo_list.__doc__) list_parser = sp.add_parser("list", help=repo_list.__doc__)
list_parser.add_argument( list_parser.add_argument(
"--scope", "--scope", action=arguments.ConfigScope, help="configuration scope to read from"
action=arguments.ConfigScope,
default=lambda: spack.config.default_list_scope(),
help="configuration scope to read from",
) )
# Add # Add

View File

@ -35,12 +35,9 @@
import os import os
import re import re
import sys import sys
from contextlib import contextmanager from typing import Any, Callable, Dict, Generator, List, Optional, Tuple, Type, Union
from typing import Dict, List, Optional, Union
import llnl.util.lang from llnl.util import filesystem, lang, tty
import llnl.util.tty as tty
from llnl.util.filesystem import mkdirp, rename
import spack.compilers import spack.compilers
import spack.paths import spack.paths
@ -114,28 +111,34 @@
#: Base name for the (internal) overrides scope. #: Base name for the (internal) overrides scope.
_OVERRIDES_BASE_NAME = "overrides-" _OVERRIDES_BASE_NAME = "overrides-"
#: Type used for raw YAML configuration
YamlConfigDict = Dict[str, Any]
class ConfigScope: class ConfigScope:
"""This class represents a configuration scope. """This class represents a configuration scope.
A scope is one directory containing named configuration files. A scope is one directory containing named configuration files.
Each file is a config "section" (e.g., mirrors, compilers, etc). Each file is a config "section" (e.g., mirrors, compilers, etc.).
""" """
def __init__(self, name, path): def __init__(self, name, path) -> None:
self.name = name # scope name. self.name = name # scope name.
self.path = path # path to directory containing configs. self.path = path # path to directory containing configs.
self.sections = syaml.syaml_dict() # sections read from config files. self.sections = syaml.syaml_dict() # sections read from config files.
@property @property
def is_platform_dependent(self): def is_platform_dependent(self) -> bool:
"""Returns true if the scope name is platform specific"""
return os.sep in self.name return os.sep in self.name
def get_section_filename(self, section): def get_section_filename(self, section: str) -> str:
"""Returns the filename associated with a given section"""
_validate_section_name(section) _validate_section_name(section)
return os.path.join(self.path, "%s.yaml" % section) return os.path.join(self.path, f"{section}.yaml")
def get_section(self, section): def get_section(self, section: str) -> Optional[YamlConfigDict]:
"""Returns the data associated with a given section"""
if section not in self.sections: if section not in self.sections:
path = self.get_section_filename(section) path = self.get_section_filename(section)
schema = SECTION_SCHEMAS[section] schema = SECTION_SCHEMAS[section]
@ -143,39 +146,44 @@ def get_section(self, section):
self.sections[section] = data self.sections[section] = data
return self.sections[section] return self.sections[section]
def _write_section(self, section): def _write_section(self, section: str) -> None:
filename = self.get_section_filename(section) filename = self.get_section_filename(section)
data = self.get_section(section) data = self.get_section(section)
if data is None:
return
# We copy data here to avoid adding defaults at write time # We copy data here to avoid adding defaults at write time
validate_data = copy.deepcopy(data) validate_data = copy.deepcopy(data)
validate(validate_data, SECTION_SCHEMAS[section]) validate(validate_data, SECTION_SCHEMAS[section])
try: try:
mkdirp(self.path) filesystem.mkdirp(self.path)
with open(filename, "w") as f: with open(filename, "w") as f:
syaml.dump_config(data, stream=f, default_flow_style=False) syaml.dump_config(data, stream=f, default_flow_style=False)
except (syaml.SpackYAMLError, IOError) as e: except (syaml.SpackYAMLError, OSError) as e:
raise ConfigFileError(f"cannot write to '{filename}'") from e raise ConfigFileError(f"cannot write to '{filename}'") from e
def clear(self): def clear(self) -> None:
"""Empty cached config information.""" """Empty cached config information."""
self.sections = syaml.syaml_dict() self.sections = syaml.syaml_dict()
def __repr__(self): def __repr__(self) -> str:
return "<ConfigScope: %s: %s>" % (self.name, self.path) return f"<ConfigScope: {self.name}: {self.path}>"
class SingleFileScope(ConfigScope): class SingleFileScope(ConfigScope):
"""This class represents a configuration scope in a single YAML file.""" """This class represents a configuration scope in a single YAML file."""
def __init__(self, name, path, schema, yaml_path=None): def __init__(
self, name: str, path: str, schema: YamlConfigDict, yaml_path: Optional[List[str]] = None
) -> None:
"""Similar to ``ConfigScope`` but can be embedded in another schema. """Similar to ``ConfigScope`` but can be embedded in another schema.
Arguments: Arguments:
schema (dict): jsonschema for the file to read schema (dict): jsonschema for the file to read
yaml_path (list): path in the schema where config data can be yaml_path (list): path in the schema where config data can be
found. found.
If the schema accepts the following yaml data, the yaml_path If the schema accepts the following yaml data, the yaml_path
would be ['outer', 'inner'] would be ['outer', 'inner']
@ -187,18 +195,18 @@ def __init__(self, name, path, schema, yaml_path=None):
install_tree: $spack/opt/spack install_tree: $spack/opt/spack
""" """
super().__init__(name, path) super().__init__(name, path)
self._raw_data = None self._raw_data: Optional[YamlConfigDict] = None
self.schema = schema self.schema = schema
self.yaml_path = yaml_path or [] self.yaml_path = yaml_path or []
@property @property
def is_platform_dependent(self): def is_platform_dependent(self) -> bool:
return False return False
def get_section_filename(self, section): def get_section_filename(self, section) -> str:
return self.path return self.path
def get_section(self, section): def get_section(self, section: str) -> Optional[YamlConfigDict]:
# read raw data from the file, which looks like: # read raw data from the file, which looks like:
# { # {
# 'config': { # 'config': {
@ -247,8 +255,8 @@ def get_section(self, section):
return self.sections.get(section, None) return self.sections.get(section, None)
def _write_section(self, section): def _write_section(self, section: str) -> None:
data_to_write = self._raw_data data_to_write: Optional[YamlConfigDict] = self._raw_data
# If there is no existing data, this section SingleFileScope has never # If there is no existing data, this section SingleFileScope has never
# been written to disk. We need to construct the portion of the data # been written to disk. We need to construct the portion of the data
@ -278,18 +286,18 @@ def _write_section(self, section):
validate(data_to_write, self.schema) validate(data_to_write, self.schema)
try: try:
parent = os.path.dirname(self.path) parent = os.path.dirname(self.path)
mkdirp(parent) filesystem.mkdirp(parent)
tmp = os.path.join(parent, ".%s.tmp" % os.path.basename(self.path)) tmp = os.path.join(parent, f".{os.path.basename(self.path)}.tmp")
with open(tmp, "w") as f: with open(tmp, "w") as f:
syaml.dump_config(data_to_write, stream=f, default_flow_style=False) syaml.dump_config(data_to_write, stream=f, default_flow_style=False)
rename(tmp, self.path) filesystem.rename(tmp, self.path)
except (syaml.SpackYAMLError, IOError) as e: except (syaml.SpackYAMLError, OSError) as e:
raise ConfigFileError(f"cannot write to config file {str(e)}") from e raise ConfigFileError(f"cannot write to config file {str(e)}") from e
def __repr__(self): def __repr__(self) -> str:
return "<SingleFileScope: %s: %s>" % (self.name, self.path) return f"<SingleFileScope: {self.name}: {self.path}>"
class ImmutableConfigScope(ConfigScope): class ImmutableConfigScope(ConfigScope):
@ -298,11 +306,11 @@ class ImmutableConfigScope(ConfigScope):
This is used for ConfigScopes passed on the command line. This is used for ConfigScopes passed on the command line.
""" """
def _write_section(self, section): def _write_section(self, section) -> None:
raise ConfigError("Cannot write to immutable scope %s" % self) raise ConfigError(f"Cannot write to immutable scope {self}")
def __repr__(self): def __repr__(self) -> str:
return "<ImmutableConfigScope: %s: %s>" % (self.name, self.path) return f"<ImmutableConfigScope: {self.name}: {self.path}>"
class InternalConfigScope(ConfigScope): class InternalConfigScope(ConfigScope):
@ -313,56 +321,58 @@ class InternalConfigScope(ConfigScope):
override settings from files. override settings from files.
""" """
def __init__(self, name, data=None): def __init__(self, name: str, data: Optional[YamlConfigDict] = None) -> None:
super().__init__(name, None) super().__init__(name, None)
self.sections = syaml.syaml_dict() self.sections = syaml.syaml_dict()
if data: if data is not None:
data = InternalConfigScope._process_dict_keyname_overrides(data) data = InternalConfigScope._process_dict_keyname_overrides(data)
for section in data: for section in data:
dsec = data[section] dsec = data[section]
validate({section: dsec}, SECTION_SCHEMAS[section]) validate({section: dsec}, SECTION_SCHEMAS[section])
self.sections[section] = _mark_internal(syaml.syaml_dict({section: dsec}), name) self.sections[section] = _mark_internal(syaml.syaml_dict({section: dsec}), name)
def get_section_filename(self, section): def get_section_filename(self, section: str) -> str:
raise NotImplementedError("Cannot get filename for InternalConfigScope.") raise NotImplementedError("Cannot get filename for InternalConfigScope.")
def get_section(self, section): def get_section(self, section: str) -> Optional[YamlConfigDict]:
"""Just reads from an internal dictionary.""" """Just reads from an internal dictionary."""
if section not in self.sections: if section not in self.sections:
self.sections[section] = None self.sections[section] = None
return self.sections[section] return self.sections[section]
def _write_section(self, section): def _write_section(self, section: str) -> None:
"""This only validates, as the data is already in memory.""" """This only validates, as the data is already in memory."""
data = self.get_section(section) data = self.get_section(section)
if data is not None: if data is not None:
validate(data, SECTION_SCHEMAS[section]) validate(data, SECTION_SCHEMAS[section])
self.sections[section] = _mark_internal(data, self.name) self.sections[section] = _mark_internal(data, self.name)
def __repr__(self): def __repr__(self) -> str:
return "<InternalConfigScope: %s>" % self.name return f"<InternalConfigScope: {self.name}>"
def clear(self): def clear(self) -> None:
# no cache to clear here. # no cache to clear here.
pass pass
@staticmethod @staticmethod
def _process_dict_keyname_overrides(data): def _process_dict_keyname_overrides(data: YamlConfigDict) -> YamlConfigDict:
"""Turn a trailing `:' in a key name into an override attribute.""" """Turn a trailing `:' in a key name into an override attribute."""
result = {} # Below we have a lot of type directives, since we hack on types and monkey-patch them
# by adding attributes that otherwise they won't have.
result: YamlConfigDict = {}
for sk, sv in data.items(): for sk, sv in data.items():
if sk.endswith(":"): if sk.endswith(":"):
key = syaml.syaml_str(sk[:-1]) key = syaml.syaml_str(sk[:-1])
key.override = True key.override = True # type: ignore[attr-defined]
elif sk.endswith("+"): elif sk.endswith("+"):
key = syaml.syaml_str(sk[:-1]) key = syaml.syaml_str(sk[:-1])
key.prepend = True key.prepend = True # type: ignore[attr-defined]
elif sk.endswith("-"): elif sk.endswith("-"):
key = syaml.syaml_str(sk[:-1]) key = syaml.syaml_str(sk[:-1])
key.append = True key.append = True # type: ignore[attr-defined]
else: else:
key = sk key = sk # type: ignore[assignment]
if isinstance(sv, dict): if isinstance(sv, dict):
result[key] = InternalConfigScope._process_dict_keyname_overrides(sv) result[key] = InternalConfigScope._process_dict_keyname_overrides(sv)
@ -395,7 +405,7 @@ class Configuration:
# convert to typing.OrderedDict when we drop 3.6, or OrderedDict when we reach 3.9 # convert to typing.OrderedDict when we drop 3.6, or OrderedDict when we reach 3.9
scopes: Dict[str, ConfigScope] scopes: Dict[str, ConfigScope]
def __init__(self, *scopes: ConfigScope): def __init__(self, *scopes: ConfigScope) -> None:
"""Initialize a configuration with an initial list of scopes. """Initialize a configuration with an initial list of scopes.
Args: Args:
@ -406,26 +416,26 @@ def __init__(self, *scopes: ConfigScope):
self.scopes = collections.OrderedDict() self.scopes = collections.OrderedDict()
for scope in scopes: for scope in scopes:
self.push_scope(scope) self.push_scope(scope)
self.format_updates: Dict[str, List[str]] = collections.defaultdict(list) self.format_updates: Dict[str, List[ConfigScope]] = collections.defaultdict(list)
@_config_mutator @_config_mutator
def push_scope(self, scope: ConfigScope): def push_scope(self, scope: ConfigScope) -> None:
"""Add a higher precedence scope to the Configuration.""" """Add a higher precedence scope to the Configuration."""
tty.debug("[CONFIGURATION: PUSH SCOPE]: {}".format(str(scope)), level=2) tty.debug(f"[CONFIGURATION: PUSH SCOPE]: {str(scope)}", level=2)
self.scopes[scope.name] = scope self.scopes[scope.name] = scope
@_config_mutator @_config_mutator
def pop_scope(self) -> ConfigScope: def pop_scope(self) -> ConfigScope:
"""Remove the highest precedence scope and return it.""" """Remove the highest precedence scope and return it."""
name, scope = self.scopes.popitem(last=True) # type: ignore[call-arg] name, scope = self.scopes.popitem(last=True) # type: ignore[call-arg]
tty.debug("[CONFIGURATION: POP SCOPE]: {}".format(str(scope)), level=2) tty.debug(f"[CONFIGURATION: POP SCOPE]: {str(scope)}", level=2)
return scope return scope
@_config_mutator @_config_mutator
def remove_scope(self, scope_name: str) -> Optional[ConfigScope]: def remove_scope(self, scope_name: str) -> Optional[ConfigScope]:
"""Remove scope by name; has no effect when ``scope_name`` does not exist""" """Remove scope by name; has no effect when ``scope_name`` does not exist"""
scope = self.scopes.pop(scope_name, None) scope = self.scopes.pop(scope_name, None)
tty.debug("[CONFIGURATION: POP SCOPE]: {}".format(str(scope)), level=2) tty.debug(f"[CONFIGURATION: POP SCOPE]: {str(scope)}", level=2)
return scope return scope
@property @property
@ -482,16 +492,16 @@ def _validate_scope(self, scope: Optional[str]) -> ConfigScope:
else: else:
raise ValueError( raise ValueError(
"Invalid config scope: '%s'. Must be one of %s" % (scope, self.scopes.keys()) f"Invalid config scope: '{scope}'. Must be one of {self.scopes.keys()}"
) )
def get_config_filename(self, scope, section) -> str: def get_config_filename(self, scope: str, section: str) -> str:
"""For some scope and section, get the name of the configuration file.""" """For some scope and section, get the name of the configuration file."""
scope = self._validate_scope(scope) scope = self._validate_scope(scope)
return scope.get_section_filename(section) return scope.get_section_filename(section)
@_config_mutator @_config_mutator
def clear_caches(self): def clear_caches(self) -> None:
"""Clears the caches for configuration files, """Clears the caches for configuration files,
This will cause files to be re-read upon the next request.""" This will cause files to be re-read upon the next request."""
@ -501,7 +511,7 @@ def clear_caches(self):
@_config_mutator @_config_mutator
def update_config( def update_config(
self, section: str, update_data: Dict, scope: Optional[str] = None, force: bool = False self, section: str, update_data: Dict, scope: Optional[str] = None, force: bool = False
): ) -> None:
"""Update the configuration file for a particular scope. """Update the configuration file for a particular scope.
Overwrites contents of a section in a scope with update_data, Overwrites contents of a section in a scope with update_data,
@ -515,10 +525,10 @@ def update_config(
format will fail to update unless ``force`` is True. format will fail to update unless ``force`` is True.
Args: Args:
section (str): section of the configuration to be updated section: section of the configuration to be updated
update_data (dict): data to be used for the update update_data: data to be used for the update
scope (str): scope to be updated scope: scope to be updated
force (str): force the update force: force the update
""" """
if self.format_updates.get(section) and not force: if self.format_updates.get(section) and not force:
msg = ( msg = (
@ -547,7 +557,7 @@ def update_config(
scope._write_section(section) scope._write_section(section)
def get_config(self, section, scope=None): def get_config(self, section: str, scope: Optional[str] = None) -> YamlConfigDict:
"""Get configuration settings for a section. """Get configuration settings for a section.
If ``scope`` is ``None`` or not provided, return the merged contents If ``scope`` is ``None`` or not provided, return the merged contents
@ -574,12 +584,12 @@ def get_config(self, section, scope=None):
""" """
return self._get_config_memoized(section, scope) return self._get_config_memoized(section, scope)
@llnl.util.lang.memoized @lang.memoized
def _get_config_memoized(self, section, scope): def _get_config_memoized(self, section: str, scope: Optional[str]) -> YamlConfigDict:
_validate_section_name(section) _validate_section_name(section)
if scope is None: if scope is None:
scopes = self.scopes.values() scopes = list(self.scopes.values())
else: else:
scopes = [self._validate_scope(scope)] scopes = [self._validate_scope(scope)]
@ -614,7 +624,7 @@ def _get_config_memoized(self, section, scope):
ret = syaml.syaml_dict(ret) ret = syaml.syaml_dict(ret)
return ret return ret
def get(self, path, default=None, scope=None): def get(self, path: str, default: Optional[Any] = None, scope: Optional[str] = None) -> Any:
"""Get a config section or a single value from one. """Get a config section or a single value from one.
Accepts a path syntax that allows us to grab nested config map Accepts a path syntax that allows us to grab nested config map
@ -645,7 +655,7 @@ def get(self, path, default=None, scope=None):
return value return value
@_config_mutator @_config_mutator
def set(self, path, value, scope=None): def set(self, path: str, value: Any, scope: Optional[str] = None) -> None:
"""Convenience function for setting single values in config files. """Convenience function for setting single values in config files.
Accepts the path syntax described in ``get()``. Accepts the path syntax described in ``get()``.
@ -687,21 +697,22 @@ def set(self, path, value, scope=None):
def __iter__(self): def __iter__(self):
"""Iterate over scopes in this configuration.""" """Iterate over scopes in this configuration."""
for scope in self.scopes.values(): yield from self.scopes.values()
yield scope
def print_section(self, section, blame=False): def print_section(self, section: str, blame: bool = False) -> None:
"""Print a configuration to stdout.""" """Print a configuration to stdout."""
try: try:
data = syaml.syaml_dict() data = syaml.syaml_dict()
data[section] = self.get_config(section) data[section] = self.get_config(section)
syaml.dump_config(data, stream=sys.stdout, default_flow_style=False, blame=blame) syaml.dump_config(data, stream=sys.stdout, default_flow_style=False, blame=blame)
except (syaml.SpackYAMLError, IOError) as e: except (syaml.SpackYAMLError, OSError) as e:
raise ConfigError(f"cannot read '{section}' configuration") from e raise ConfigError(f"cannot read '{section}' configuration") from e
@contextmanager @contextlib.contextmanager
def override(path_or_scope, value=None): def override(
path_or_scope: Union[ConfigScope, str], value: Optional[Any] = None
) -> Generator[Union[lang.Singleton, Configuration], None, None]:
"""Simple way to override config settings within a context. """Simple way to override config settings within a context.
Arguments: Arguments:
@ -719,10 +730,10 @@ def override(path_or_scope, value=None):
else: else:
base_name = _OVERRIDES_BASE_NAME base_name = _OVERRIDES_BASE_NAME
# Ensure the new override gets a unique scope name # Ensure the new override gets a unique scope name
current_overrides = [s.name for s in CONFIG.matching_scopes(r"^{0}".format(base_name))] current_overrides = [s.name for s in CONFIG.matching_scopes(rf"^{base_name}")]
num_overrides = len(current_overrides) num_overrides = len(current_overrides)
while True: while True:
scope_name = "{0}{1}".format(base_name, num_overrides) scope_name = f"{base_name}{num_overrides}"
if scope_name in current_overrides: if scope_name in current_overrides:
num_overrides += 1 num_overrides += 1
else: else:
@ -739,12 +750,13 @@ def override(path_or_scope, value=None):
assert scope is overrides assert scope is overrides
#: configuration scopes added on the command line #: configuration scopes added on the command line set by ``spack.main.main()``
#: set by ``spack.main.main()``.
COMMAND_LINE_SCOPES: List[str] = [] COMMAND_LINE_SCOPES: List[str] = []
def _add_platform_scope(cfg, scope_type, name, path): def _add_platform_scope(
cfg: Union[Configuration, lang.Singleton], scope_type: Type[ConfigScope], name: str, path: str
) -> None:
"""Add a platform-specific subdirectory for the current platform.""" """Add a platform-specific subdirectory for the current platform."""
platform = spack.platforms.host().name platform = spack.platforms.host().name
plat_name = os.path.join(name, platform) plat_name = os.path.join(name, platform)
@ -752,7 +764,9 @@ def _add_platform_scope(cfg, scope_type, name, path):
cfg.push_scope(scope_type(plat_name, plat_path)) cfg.push_scope(scope_type(plat_name, plat_path))
def _add_command_line_scopes(cfg, command_line_scopes): def _add_command_line_scopes(
cfg: Union[Configuration, lang.Singleton], command_line_scopes: List[str]
) -> None:
"""Add additional scopes from the --config-scope argument. """Add additional scopes from the --config-scope argument.
Command line scopes are named after their position in the arg list. Command line scopes are named after their position in the arg list.
@ -761,26 +775,22 @@ def _add_command_line_scopes(cfg, command_line_scopes):
# We ensure that these scopes exist and are readable, as they are # We ensure that these scopes exist and are readable, as they are
# provided on the command line by the user. # provided on the command line by the user.
if not os.path.isdir(path): if not os.path.isdir(path):
raise ConfigError("config scope is not a directory: '%s'" % path) raise ConfigError(f"config scope is not a directory: '{path}'")
elif not os.access(path, os.R_OK): elif not os.access(path, os.R_OK):
raise ConfigError("config scope is not readable: '%s'" % path) raise ConfigError(f"config scope is not readable: '{path}'")
# name based on order on the command line # name based on order on the command line
name = "cmd_scope_%d" % i name = f"cmd_scope_{i:d}"
cfg.push_scope(ImmutableConfigScope(name, path)) cfg.push_scope(ImmutableConfigScope(name, path))
_add_platform_scope(cfg, ImmutableConfigScope, name, path) _add_platform_scope(cfg, ImmutableConfigScope, name, path)
def create(): def create() -> Configuration:
"""Singleton Configuration instance. """Singleton Configuration instance.
This constructs one instance associated with this module and returns This constructs one instance associated with this module and returns
it. It is bundled inside a function so that configuration can be it. It is bundled inside a function so that configuration can be
initialized lazily. initialized lazily.
Return:
(Configuration): object for accessing spack configuration
""" """
cfg = Configuration() cfg = Configuration()
@ -829,16 +839,25 @@ def create():
#: This is the singleton configuration instance for Spack. #: This is the singleton configuration instance for Spack.
CONFIG: Union[Configuration, llnl.util.lang.Singleton] = llnl.util.lang.Singleton(create) CONFIG: Union[Configuration, lang.Singleton] = lang.Singleton(create)
def add_from_file(filename, scope=None): def add_from_file(filename: str, scope: Optional[str] = None) -> None:
"""Add updates to a config from a filename""" """Add updates to a config from a filename"""
# Extract internal attributes, if we are dealing with an environment # Extract internal attributes, if we are dealing with an environment
data = read_config_file(filename) data = read_config_file(filename)
if data is None:
return
if spack.schema.env.TOP_LEVEL_KEY in data: if spack.schema.env.TOP_LEVEL_KEY in data:
data = data[spack.schema.env.TOP_LEVEL_KEY] data = data[spack.schema.env.TOP_LEVEL_KEY]
msg = (
"unexpected 'None' value when retrieving configuration. "
"Please submit a bug-report at https://github.com/spack/spack/issues"
)
assert data is not None, msg
# update all sections from config dict # update all sections from config dict
# We have to iterate on keys to keep overrides from the file # We have to iterate on keys to keep overrides from the file
for section in data.keys(): for section in data.keys():
@ -856,7 +875,7 @@ def add_from_file(filename, scope=None):
CONFIG.set(section, new, scope) CONFIG.set(section, new, scope)
def add(fullpath, scope=None): def add(fullpath: str, scope: Optional[str] = None) -> None:
"""Add the given configuration to the specified config scope. """Add the given configuration to the specified config scope.
Add accepts a path. If you want to add from a filename, use add_from_file""" Add accepts a path. If you want to add from a filename, use add_from_file"""
components = process_config_path(fullpath) components = process_config_path(fullpath)
@ -904,12 +923,12 @@ def add(fullpath, scope=None):
CONFIG.set(path, new, scope) CONFIG.set(path, new, scope)
def get(path, default=None, scope=None): def get(path: str, default: Optional[Any] = None, scope: Optional[str] = None) -> Any:
"""Module-level wrapper for ``Configuration.get()``.""" """Module-level wrapper for ``Configuration.get()``."""
return CONFIG.get(path, default, scope) return CONFIG.get(path, default, scope)
def set(path, value, scope=None): def set(path: str, value: Any, scope: Optional[str] = None) -> None:
"""Convenience function for setting single values in config files. """Convenience function for setting single values in config files.
Accepts the path syntax described in ``get()``. Accepts the path syntax described in ``get()``.
@ -917,13 +936,13 @@ def set(path, value, scope=None):
return CONFIG.set(path, value, scope) return CONFIG.set(path, value, scope)
def add_default_platform_scope(platform): def add_default_platform_scope(platform: str) -> None:
plat_name = os.path.join("defaults", platform) plat_name = os.path.join("defaults", platform)
plat_path = os.path.join(CONFIGURATION_DEFAULTS_PATH[1], platform) plat_path = os.path.join(CONFIGURATION_DEFAULTS_PATH[1], platform)
CONFIG.push_scope(ConfigScope(plat_name, plat_path)) CONFIG.push_scope(ConfigScope(plat_name, plat_path))
def scopes(): def scopes() -> Dict[str, ConfigScope]:
"""Convenience function to get list of configuration scopes.""" """Convenience function to get list of configuration scopes."""
return CONFIG.scopes return CONFIG.scopes
@ -947,11 +966,13 @@ def writable_scope_names() -> List[str]:
return list(x.name for x in writable_scopes()) return list(x.name for x in writable_scopes())
def matched_config(cfg_path): def matched_config(cfg_path: str) -> List[Tuple[str, Any]]:
return [(scope, get(cfg_path, scope=scope)) for scope in writable_scope_names()] return [(scope, get(cfg_path, scope=scope)) for scope in writable_scope_names()]
def change_or_add(section_name, find_fn, update_fn): def change_or_add(
section_name: str, find_fn: Callable[[str], bool], update_fn: Callable[[str], None]
) -> None:
"""Change or add a subsection of config, with additional logic to """Change or add a subsection of config, with additional logic to
select a reasonable scope where the change is applied. select a reasonable scope where the change is applied.
@ -994,7 +1015,7 @@ def change_or_add(section_name, find_fn, update_fn):
spack.config.set(section_name, section, scope=scope) spack.config.set(section_name, section, scope=scope)
def update_all(section_name, change_fn): def update_all(section_name: str, change_fn: Callable[[str], bool]) -> None:
"""Change a config section, which may have details duplicated """Change a config section, which may have details duplicated
across multiple scopes. across multiple scopes.
""" """
@ -1006,21 +1027,22 @@ def update_all(section_name, change_fn):
spack.config.set(section_name, section, scope=scope) spack.config.set(section_name, section, scope=scope)
def _validate_section_name(section): def _validate_section_name(section: str) -> None:
"""Exit if the section is not a valid section.""" """Exit if the section is not a valid section."""
if section not in SECTION_SCHEMAS: if section not in SECTION_SCHEMAS:
raise ConfigSectionError( raise ConfigSectionError(
"Invalid config section: '%s'. Options are: %s" f"Invalid config section: '{section}'. Options are: {' '.join(SECTION_SCHEMAS.keys())}"
% (section, " ".join(SECTION_SCHEMAS.keys()))
) )
def validate(data, schema, filename=None): def validate(
data: YamlConfigDict, schema: YamlConfigDict, filename: Optional[str] = None
) -> YamlConfigDict:
"""Validate data read in from a Spack YAML file. """Validate data read in from a Spack YAML file.
Arguments: Arguments:
data (dict or list): data read from a Spack YAML file data: data read from a Spack YAML file
schema (dict or list): jsonschema to validate data schema: jsonschema to validate data
This leverages the line information (start_mark, end_mark) stored This leverages the line information (start_mark, end_mark) stored
on Spack YAML structures. on Spack YAML structures.
@ -1043,7 +1065,9 @@ def validate(data, schema, filename=None):
return test_data return test_data
def read_config_file(filename, schema=None): def read_config_file(
filename: str, schema: Optional[YamlConfigDict] = None
) -> Optional[YamlConfigDict]:
"""Read a YAML configuration file. """Read a YAML configuration file.
User can provide a schema for validation. If no schema is provided, User can provide a schema for validation. If no schema is provided,
@ -1055,17 +1079,17 @@ def read_config_file(filename, schema=None):
if not os.path.exists(filename): if not os.path.exists(filename):
# Ignore nonexistent files. # Ignore nonexistent files.
tty.debug("Skipping nonexistent config path {0}".format(filename), level=3) tty.debug(f"Skipping nonexistent config path {filename}", level=3)
return None return None
elif not os.path.isfile(filename): elif not os.path.isfile(filename):
raise ConfigFileError("Invalid configuration. %s exists but is not a file." % filename) raise ConfigFileError(f"Invalid configuration. {filename} exists but is not a file.")
elif not os.access(filename, os.R_OK): elif not os.access(filename, os.R_OK):
raise ConfigFileError("Config file is not readable: {0}".format(filename)) raise ConfigFileError(f"Config file is not readable: {filename}")
try: try:
tty.debug("Reading config from file {0}".format(filename)) tty.debug(f"Reading config from file {filename}")
with open(filename) as f: with open(filename) as f:
data = syaml.load_config(f) data = syaml.load_config(f)
@ -1083,11 +1107,11 @@ def read_config_file(filename, schema=None):
except syaml.SpackYAMLError as e: except syaml.SpackYAMLError as e:
raise ConfigFileError(str(e)) from e raise ConfigFileError(str(e)) from e
except IOError as e: except OSError as e:
raise ConfigFileError(f"Error reading configuration file {filename}: {str(e)}") from e raise ConfigFileError(f"Error reading configuration file {filename}: {str(e)}") from e
def _override(string): def _override(string: str) -> bool:
"""Test if a spack YAML string is an override. """Test if a spack YAML string is an override.
See ``spack_yaml`` for details. Keys in Spack YAML can end in `::`, See ``spack_yaml`` for details. Keys in Spack YAML can end in `::`,
@ -1098,7 +1122,7 @@ def _override(string):
return hasattr(string, "override") and string.override return hasattr(string, "override") and string.override
def _append(string): def _append(string: str) -> bool:
"""Test if a spack YAML string is an override. """Test if a spack YAML string is an override.
See ``spack_yaml`` for details. Keys in Spack YAML can end in `+:`, See ``spack_yaml`` for details. Keys in Spack YAML can end in `+:`,
@ -1112,7 +1136,7 @@ def _append(string):
return getattr(string, "append", False) return getattr(string, "append", False)
def _prepend(string): def _prepend(string: str) -> bool:
"""Test if a spack YAML string is an override. """Test if a spack YAML string is an override.
See ``spack_yaml`` for details. Keys in Spack YAML can end in `+:`, See ``spack_yaml`` for details. Keys in Spack YAML can end in `+:`,
@ -1184,7 +1208,7 @@ def get_valid_type(path):
return types[schema_type]() return types[schema_type]()
else: else:
return type(None) return type(None)
raise ConfigError("Cannot determine valid type for path '%s'." % path) raise ConfigError(f"Cannot determine valid type for path '{path}'.")
def remove_yaml(dest, source): def remove_yaml(dest, source):
@ -1312,7 +1336,7 @@ def they_are(t):
return copy.copy(source) return copy.copy(source)
def process_config_path(path): def process_config_path(path: str) -> List[str]:
"""Process a path argument to config.set() that may contain overrides ('::' or """Process a path argument to config.set() that may contain overrides ('::' or
trailing ':') trailing ':')
@ -1325,29 +1349,29 @@ def process_config_path(path):
""" """
result = [] result = []
if path.startswith(":"): if path.startswith(":"):
raise syaml.SpackYAMLError("Illegal leading `:' in path `{0}'".format(path), "") raise syaml.SpackYAMLError(f"Illegal leading `:' in path `{path}'", "")
seen_override_in_path = False seen_override_in_path = False
while path: while path:
front, sep, path = path.partition(":") front, sep, path = path.partition(":")
if (sep and not path) or path.startswith(":"): if (sep and not path) or path.startswith(":"):
if seen_override_in_path: if seen_override_in_path:
raise syaml.SpackYAMLError( raise syaml.SpackYAMLError(
"Meaningless second override" " indicator `::' in path `{0}'".format(path), "" f"Meaningless second override indicator `::' in path `{path}'", ""
) )
path = path.lstrip(":") path = path.lstrip(":")
front = syaml.syaml_str(front) front = syaml.syaml_str(front)
front.override = True front.override = True # type: ignore[attr-defined]
seen_override_in_path = True seen_override_in_path = True
elif front.endswith("+"): elif front.endswith("+"):
front = front.rstrip("+") front = front.rstrip("+")
front = syaml.syaml_str(front) front = syaml.syaml_str(front)
front.prepend = True front.prepend = True # type: ignore[attr-defined]
elif front.endswith("-"): elif front.endswith("-"):
front = front.rstrip("-") front = front.rstrip("-")
front = syaml.syaml_str(front) front = syaml.syaml_str(front)
front.append = True front.append = True # type: ignore[attr-defined]
result.append(front) result.append(front)
@ -1367,7 +1391,7 @@ def process_config_path(path):
# #
# Settings for commands that modify configuration # Settings for commands that modify configuration
# #
def default_modify_scope(section="config"): def default_modify_scope(section: str = "config") -> str:
"""Return the config scope that commands should modify by default. """Return the config scope that commands should modify by default.
Commands that modify configuration by default modify the *highest* Commands that modify configuration by default modify the *highest*
@ -1383,23 +1407,15 @@ def default_modify_scope(section="config"):
return CONFIG.highest_precedence_non_platform_scope().name return CONFIG.highest_precedence_non_platform_scope().name
def default_list_scope(): def _update_in_memory(data: YamlConfigDict, section: str) -> bool:
"""Return the config scope that is listed by default.
Commands that list configuration list *all* scopes (merged) by default.
"""
return None
def _update_in_memory(data, section):
"""Update the format of the configuration data in memory. """Update the format of the configuration data in memory.
This function assumes the section is valid (i.e. validation This function assumes the section is valid (i.e. validation
is responsibility of the caller) is responsibility of the caller)
Args: Args:
data (dict): configuration data data: configuration data
section (str): section of the configuration to update section: section of the configuration to update
Returns: Returns:
True if the data was changed, False otherwise True if the data was changed, False otherwise
@ -1409,14 +1425,14 @@ def _update_in_memory(data, section):
return changed return changed
def ensure_latest_format_fn(section): def ensure_latest_format_fn(section: str) -> Callable[[YamlConfigDict], bool]:
"""Return a function that takes as input a dictionary read from """Return a function that takes as input a dictionary read from
a configuration file and update it to the latest format. a configuration file and update it to the latest format.
The function returns True if there was any update, False otherwise. The function returns True if there was any update, False otherwise.
Args: Args:
section (str): section of the configuration e.g. "packages", section: section of the configuration e.g. "packages",
"config", etc. "config", etc.
""" """
# The line below is based on the fact that every module we need # The line below is based on the fact that every module we need
@ -1427,7 +1443,9 @@ def ensure_latest_format_fn(section):
@contextlib.contextmanager @contextlib.contextmanager
def use_configuration(*scopes_or_paths): def use_configuration(
*scopes_or_paths: Union[ConfigScope, str]
) -> Generator[Configuration, None, None]:
"""Use the configuration scopes passed as arguments within the """Use the configuration scopes passed as arguments within the
context manager. context manager.
@ -1451,8 +1469,8 @@ def use_configuration(*scopes_or_paths):
CONFIG = saved_config CONFIG = saved_config
@llnl.util.lang.memoized @lang.memoized
def _config_from(scopes_or_paths): def _config_from(scopes_or_paths: List[Union[ConfigScope, str]]) -> Configuration:
scopes = [] scopes = []
for scope_or_path in scopes_or_paths: for scope_or_path in scopes_or_paths:
# If we have a config scope we are already done # If we have a config scope we are already done
@ -1462,7 +1480,7 @@ def _config_from(scopes_or_paths):
# Otherwise we need to construct it # Otherwise we need to construct it
path = os.path.normpath(scope_or_path) path = os.path.normpath(scope_or_path)
assert os.path.isdir(path), '"{0}" must be a directory'.format(path) assert os.path.isdir(path), f'"{path}" must be a directory'
name = os.path.basename(path) name = os.path.basename(path)
scopes.append(ConfigScope(name, path)) scopes.append(ConfigScope(name, path))
@ -1470,13 +1488,14 @@ def _config_from(scopes_or_paths):
return configuration return configuration
def raw_github_gitlab_url(url): def raw_github_gitlab_url(url: str) -> str:
"""Transform a github URL to the raw form to avoid undesirable html. """Transform a github URL to the raw form to avoid undesirable html.
Args: Args:
url: url to be converted to raw form url: url to be converted to raw form
Returns: (str) raw github/gitlab url or the original url Returns:
Raw github/gitlab url or the original url
""" """
# Note we rely on GitHub to redirect the 'raw' URL returned here to the # Note we rely on GitHub to redirect the 'raw' URL returned here to the
# actual URL under https://raw.githubusercontent.com/ with '/blob' # actual URL under https://raw.githubusercontent.com/ with '/blob'
@ -1529,7 +1548,7 @@ def fetch_remote_configs(url: str, dest_dir: str, skip_existing: bool = True) ->
def _fetch_file(url): def _fetch_file(url):
raw = raw_github_gitlab_url(url) raw = raw_github_gitlab_url(url)
tty.debug("Reading config from url {0}".format(raw)) tty.debug(f"Reading config from url {raw}")
return web_util.fetch_url_text(raw, dest_dir=dest_dir) return web_util.fetch_url_text(raw, dest_dir=dest_dir)
if not url: if not url:
@ -1545,8 +1564,8 @@ def _fetch_file(url):
basename = os.path.basename(config_url) basename = os.path.basename(config_url)
if skip_existing and basename in existing_files: if skip_existing and basename in existing_files:
tty.warn( tty.warn(
"Will not fetch configuration from {0} since a version already" f"Will not fetch configuration from {config_url} since a "
"exists in {1}".format(config_url, dest_dir) f"version already exists in {dest_dir}"
) )
path = os.path.join(dest_dir, basename) path = os.path.join(dest_dir, basename)
else: else:
@ -1558,7 +1577,7 @@ def _fetch_file(url):
if paths: if paths:
return dest_dir if len(paths) > 1 else paths[0] return dest_dir if len(paths) > 1 else paths[0]
raise ConfigFileError("Cannot retrieve configuration (yaml) from {0}".format(url)) raise ConfigFileError(f"Cannot retrieve configuration (yaml) from {url}")
class ConfigError(SpackError): class ConfigError(SpackError):
@ -1576,7 +1595,13 @@ class ConfigFileError(ConfigError):
class ConfigFormatError(ConfigError): class ConfigFormatError(ConfigError):
"""Raised when a configuration format does not match its schema.""" """Raised when a configuration format does not match its schema."""
def __init__(self, validation_error, data, filename=None, line=None): def __init__(
self,
validation_error,
data: YamlConfigDict,
filename: Optional[str] = None,
line: Optional[int] = None,
) -> None:
# spack yaml has its own file/line marks -- try to find them # spack yaml has its own file/line marks -- try to find them
# we prioritize these over the inputs # we prioritize these over the inputs
self.validation_error = validation_error self.validation_error = validation_error
@ -1590,11 +1615,11 @@ def __init__(self, validation_error, data, filename=None, line=None):
# construct location # construct location
location = "<unknown file>" location = "<unknown file>"
if filename: if filename:
location = "%s" % filename location = f"{filename}"
if line is not None: if line is not None:
location += ":%d" % line location += f":{line:d}"
message = "%s: %s" % (location, validation_error.message) message = f"{location}: {validation_error.message}"
super().__init__(message) super().__init__(message)
def _get_mark(self, validation_error, data): def _get_mark(self, validation_error, data):