db: more type hints (#46242)

This commit is contained in:
Harmen Stoppels 2024-09-06 09:13:14 +02:00 committed by GitHub
parent 9d754c127a
commit 7c473937ba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -212,7 +212,7 @@ def __init__(
ref_count: int = 0,
explicit: bool = False,
installation_time: Optional[float] = None,
deprecated_for: Optional["spack.spec.Spec"] = None,
deprecated_for: Optional[str] = None,
in_buildcache: bool = False,
origin=None,
):
@ -1028,7 +1028,7 @@ def _check_ref_counts(self):
Does no locking.
"""
counts = {}
counts: Dict[str, int] = {}
for key, rec in self._data.items():
counts.setdefault(key, 0)
for dep in rec.spec.dependencies(deptype=_TRACKED_DEPENDENCIES):
@ -1218,7 +1218,7 @@ def _add(
self._data[key].explicit = explicit
@_autospec
def add(self, spec: "spack.spec.Spec", *, explicit=False) -> None:
def add(self, spec: "spack.spec.Spec", *, explicit: bool = False) -> None:
"""Add spec at path to database, locking and reading DB to sync.
``add()`` will lock and read from the DB on disk.
@ -1229,7 +1229,7 @@ def add(self, spec: "spack.spec.Spec", *, explicit=False) -> None:
with self.write_transaction():
self._add(spec, explicit=explicit)
def _get_matching_spec_key(self, spec, **kwargs):
def _get_matching_spec_key(self, spec: "spack.spec.Spec", **kwargs) -> str:
"""Get the exact spec OR get a single spec that matches."""
key = spec.dag_hash()
upstream, record = self.query_by_spec_hash(key)
@ -1241,12 +1241,12 @@ def _get_matching_spec_key(self, spec, **kwargs):
return key
@_autospec
def get_record(self, spec, **kwargs):
def get_record(self, spec: "spack.spec.Spec", **kwargs) -> Optional[InstallRecord]:
key = self._get_matching_spec_key(spec, **kwargs)
upstream, record = self.query_by_spec_hash(key)
return record
def _decrement_ref_count(self, spec):
def _decrement_ref_count(self, spec: "spack.spec.Spec") -> None:
key = spec.dag_hash()
if key not in self._data:
@ -1263,7 +1263,7 @@ def _decrement_ref_count(self, spec):
for dep in spec.dependencies(deptype=_TRACKED_DEPENDENCIES):
self._decrement_ref_count(dep)
def _increment_ref_count(self, spec):
def _increment_ref_count(self, spec: "spack.spec.Spec") -> None:
key = spec.dag_hash()
if key not in self._data:
@ -1272,14 +1272,14 @@ def _increment_ref_count(self, spec):
rec = self._data[key]
rec.ref_count += 1
def _remove(self, spec):
def _remove(self, spec: "spack.spec.Spec") -> "spack.spec.Spec":
"""Non-locking version of remove(); does real work."""
key = self._get_matching_spec_key(spec)
rec = self._data[key]
# This install prefix is now free for other specs to use, even if the
# spec is only marked uninstalled.
if not rec.spec.external and rec.installed:
if not rec.spec.external and rec.installed and rec.path:
self._installed_prefixes.remove(rec.path)
if rec.ref_count > 0:
@ -1303,7 +1303,7 @@ def _remove(self, spec):
return rec.spec
@_autospec
def remove(self, spec):
def remove(self, spec: "spack.spec.Spec") -> "spack.spec.Spec":
"""Removes a spec from the database. To be called on uninstall.
Reads the database, then:
@ -1318,7 +1318,7 @@ def remove(self, spec):
with self.write_transaction():
return self._remove(spec)
def deprecator(self, spec):
def deprecator(self, spec: "spack.spec.Spec") -> Optional["spack.spec.Spec"]:
"""Return the spec that the given spec is deprecated for, or None"""
with self.read_transaction():
spec_key = self._get_matching_spec_key(spec)
@ -1329,14 +1329,14 @@ def deprecator(self, spec):
else:
return None
def specs_deprecated_by(self, spec):
def specs_deprecated_by(self, spec: "spack.spec.Spec") -> List["spack.spec.Spec"]:
"""Return all specs deprecated in favor of the given spec"""
with self.read_transaction():
return [
rec.spec for rec in self._data.values() if rec.deprecated_for == spec.dag_hash()
]
def _deprecate(self, spec, deprecator):
def _deprecate(self, spec: "spack.spec.Spec", deprecator: "spack.spec.Spec") -> None:
spec_key = self._get_matching_spec_key(spec)
spec_rec = self._data[spec_key]
@ -1354,17 +1354,17 @@ def _deprecate(self, spec, deprecator):
self._data[spec_key] = spec_rec
@_autospec
def mark(self, spec, key, value):
def mark(self, spec: "spack.spec.Spec", key, value) -> None:
"""Mark an arbitrary record on a spec."""
with self.write_transaction():
return self._mark(spec, key, value)
def _mark(self, spec, key, value):
def _mark(self, spec: "spack.spec.Spec", key, value) -> None:
record = self._data[self._get_matching_spec_key(spec)]
setattr(record, key, value)
@_autospec
def deprecate(self, spec, deprecator):
def deprecate(self, spec: "spack.spec.Spec", deprecator: "spack.spec.Spec") -> None:
"""Marks a spec as deprecated in favor of its deprecator"""
with self.write_transaction():
return self._deprecate(spec, deprecator)
@ -1372,16 +1372,16 @@ def deprecate(self, spec, deprecator):
@_autospec
def installed_relatives(
self,
spec,
direction="children",
transitive=True,
spec: "spack.spec.Spec",
direction: str = "children",
transitive: bool = True,
deptype: Union[dt.DepFlag, dt.DepTypes] = dt.ALL,
):
) -> Set["spack.spec.Spec"]:
"""Return installed specs related to this one."""
if direction not in ("parents", "children"):
raise ValueError("Invalid direction: %s" % direction)
relatives = set()
relatives: Set[spack.spec.Spec] = set()
for spec in self.query(spec):
if transitive:
to_add = spec.traverse(direction=direction, root=False, deptype=deptype)
@ -1408,7 +1408,7 @@ def installed_relatives(
return relatives
@_autospec
def installed_extensions_for(self, extendee_spec):
def installed_extensions_for(self, extendee_spec: "spack.spec.Spec"):
"""Returns the specs of all packages that extend the given spec"""
for spec in self.query():
if spec.package.extends(extendee_spec):
@ -1667,7 +1667,7 @@ def unused_specs(
self,
root_hashes: Optional[Container[str]] = None,
deptype: Union[dt.DepFlag, dt.DepTypes] = dt.LINK | dt.RUN,
) -> "List[spack.spec.Spec]":
) -> List["spack.spec.Spec"]:
"""Return all specs that are currently installed but not needed by root specs.
By default, roots are all explicit specs in the database. If a set of root