etags for index.json invalidation, test coverage (#34641)

Implement an alternative strategy to do index.json invalidation.

The current approach of pairs of index.json / index.json.hash is
problematic because it leads to races.

The standard solution for cache invalidation is etags, which are
supported by both http and s3 protocols, which allows one to do
conditional fetches.

This PR implements that for the http/https schemes. It should also work
for s3 schemes, but that requires other prs to be merged.

Also it improves unit tests for index.json fetches.
This commit is contained in:
Harmen Stoppels 2022-12-21 18:41:59 +01:00 committed by GitHub
parent c3e61664cf
commit 4473d5d811
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 443 additions and 98 deletions

View File

@ -9,12 +9,16 @@
import json
import multiprocessing.pool
import os
import re
import shutil
import sys
import tarfile
import tempfile
import time
import traceback
import urllib.error
import urllib.parse
import urllib.request
import warnings
from contextlib import closing
from urllib.error import HTTPError, URLError
@ -342,7 +346,6 @@ def update(self, with_cooldown=False):
for cached_mirror_url in self._local_index_cache:
cache_entry = self._local_index_cache[cached_mirror_url]
cached_index_hash = cache_entry["index_hash"]
cached_index_path = cache_entry["index_path"]
if cached_mirror_url in configured_mirror_urls:
# Only do a fetch if the last fetch was longer than TTL ago
@ -361,13 +364,14 @@ def update(self, with_cooldown=False):
# May need to fetch the index and update the local caches
try:
needs_regen = self._fetch_and_cache_index(
cached_mirror_url, expect_hash=cached_index_hash
cached_mirror_url,
cache_entry=cache_entry,
)
self._last_fetch_times[cached_mirror_url] = (now, True)
all_methods_failed = False
except FetchCacheError as fetch_error:
except FetchIndexError as e:
needs_regen = False
fetch_errors.extend(fetch_error.errors)
fetch_errors.append(e)
self._last_fetch_times[cached_mirror_url] = (now, False)
# The need to regenerate implies a need to clear as well.
spec_cache_clear_needed |= needs_regen
@ -396,20 +400,22 @@ def update(self, with_cooldown=False):
# already have in our cache must be fetched, stored, and represented
# locally.
for mirror_url in configured_mirror_urls:
if mirror_url not in self._local_index_cache:
# Need to fetch the index and update the local caches
try:
needs_regen = self._fetch_and_cache_index(mirror_url)
self._last_fetch_times[mirror_url] = (now, True)
all_methods_failed = False
except FetchCacheError as fetch_error:
fetch_errors.extend(fetch_error.errors)
needs_regen = False
self._last_fetch_times[mirror_url] = (now, False)
# Generally speaking, a new mirror wouldn't imply the need to
# clear the spec cache, so leave it as is.
if needs_regen:
spec_cache_regenerate_needed = True
if mirror_url in self._local_index_cache:
continue
# Need to fetch the index and update the local caches
try:
needs_regen = self._fetch_and_cache_index(mirror_url)
self._last_fetch_times[mirror_url] = (now, True)
all_methods_failed = False
except FetchIndexError as e:
fetch_errors.append(e)
needs_regen = False
self._last_fetch_times[mirror_url] = (now, False)
# Generally speaking, a new mirror wouldn't imply the need to
# clear the spec cache, so leave it as is.
if needs_regen:
spec_cache_regenerate_needed = True
self._write_local_index_cache()
@ -423,7 +429,7 @@ def update(self, with_cooldown=False):
if spec_cache_regenerate_needed:
self.regenerate_spec_cache(clear_existing=spec_cache_clear_needed)
def _fetch_and_cache_index(self, mirror_url, expect_hash=None):
def _fetch_and_cache_index(self, mirror_url, cache_entry={}):
"""Fetch a buildcache index file from a remote mirror and cache it.
If we already have a cached index from this mirror, then we first
@ -431,102 +437,50 @@ def _fetch_and_cache_index(self, mirror_url, expect_hash=None):
Args:
mirror_url (str): Base url of mirror
expect_hash (str): If provided, this hash will be compared against
the index hash we retrieve from the mirror, to determine if we
need to fetch the index or not.
cache_entry (dict): Old cache metadata with keys ``index_hash``, ``index_path``,
``etag``
Returns:
True if this function thinks the concrete spec cache,
``_mirrors_for_spec``, should be regenerated. Returns False
otherwise.
Throws:
FetchCacheError: a composite exception.
"""
index_fetch_url = url_util.join(mirror_url, _build_cache_relative_path, "index.json")
hash_fetch_url = url_util.join(mirror_url, _build_cache_relative_path, "index.json.hash")
True if the local index.json was updated.
if not web_util.url_exists(index_fetch_url):
# A binary mirror is not required to have an index, so avoid
# raising FetchCacheError in that case.
Throws:
FetchIndexError
"""
# TODO: get rid of this request, handle 404 better
if not web_util.url_exists(
url_util.join(mirror_url, _build_cache_relative_path, "index.json")
):
return False
old_cache_key = None
fetched_hash = None
errors = []
# Fetch the hash first so we can check if we actually need to fetch
# the index itself.
try:
_, _, fs = web_util.read_from_url(hash_fetch_url)
fetched_hash = codecs.getreader("utf-8")(fs).read()
except (URLError, web_util.SpackWebError) as url_err:
errors.append(
RuntimeError(
"Unable to read index hash {0} due to {1}: {2}".format(
hash_fetch_url, url_err.__class__.__name__, str(url_err)
)
)
etag = cache_entry.get("etag", None)
if etag:
fetcher = EtagIndexFetcher(mirror_url, etag)
else:
fetcher = DefaultIndexFetcher(
mirror_url, local_hash=cache_entry.get("index_hash", None)
)
# The only case where we'll skip attempting to fetch the buildcache
# index from the mirror is when we already have a hash for this
# mirror, we were able to retrieve one from the mirror, and
# the two hashes are the same.
if expect_hash and fetched_hash:
if fetched_hash == expect_hash:
tty.debug("Cached index for {0} already up to date".format(mirror_url))
return False
else:
# We expected a hash, we fetched a hash, and they were not the
# same. If we end up fetching an index successfully and
# replacing our entry for this mirror, we should clean up the
# existing cache file
if mirror_url in self._local_index_cache:
existing_entry = self._local_index_cache[mirror_url]
old_cache_key = existing_entry["index_path"]
result = fetcher.conditional_fetch()
tty.debug("Fetching index from {0}".format(index_fetch_url))
# Fetch index itself
try:
_, _, fs = web_util.read_from_url(index_fetch_url)
index_object_str = codecs.getreader("utf-8")(fs).read()
except (URLError, web_util.SpackWebError) as url_err:
errors.append(
RuntimeError(
"Unable to read index {0} due to {1}: {2}".format(
index_fetch_url, url_err.__class__.__name__, str(url_err)
)
)
)
raise FetchCacheError(errors)
locally_computed_hash = compute_hash(index_object_str)
if fetched_hash is not None and locally_computed_hash != fetched_hash:
msg = (
"Computed index hash [{0}] did not match remote [{1}, url:{2}] "
"indicating error in index transmission"
).format(locally_computed_hash, fetched_hash, hash_fetch_url)
errors.append(RuntimeError(msg))
# We somehow got an index that doesn't match the remote one, maybe
# the next time we try we'll be successful.
raise FetchCacheError(errors)
# Nothing to do
if result.fresh:
return False
# Persist new index.json
url_hash = compute_hash(mirror_url)
cache_key = "{0}_{1}.json".format(url_hash[:10], locally_computed_hash[:10])
cache_key = "{}_{}.json".format(url_hash[:10], result.hash[:10])
self._index_file_cache.init_entry(cache_key)
with self._index_file_cache.write_transaction(cache_key) as (old, new):
new.write(index_object_str)
new.write(result.data)
self._local_index_cache[mirror_url] = {
"index_hash": locally_computed_hash,
"index_hash": result.hash,
"index_path": cache_key,
"etag": result.etag,
}
# clean up the old cache_key if necessary
old_cache_key = cache_entry.get("index_path", None)
if old_cache_key:
self._index_file_cache.remove(old_cache_key)
@ -623,7 +577,9 @@ class UnsignedPackageException(spack.error.SpackError):
def compute_hash(data):
return hashlib.sha256(data.encode("utf-8")).hexdigest()
if isinstance(data, str):
data = data.encode("utf-8")
return hashlib.sha256(data).hexdigest()
def build_cache_relative_path():
@ -2413,3 +2369,124 @@ def __call__(self, spec, **kwargs):
# Matching a spec constraint
matches = [s for s in self.possible_specs if s.satisfies(spec)]
return matches
class FetchIndexError(Exception):
def __str__(self):
if len(self.args) == 1:
return str(self.args[0])
else:
return "{}, due to: {}".format(self.args[0], self.args[1])
FetchIndexResult = collections.namedtuple("FetchIndexResult", "etag hash data fresh")
class DefaultIndexFetcher:
"""Fetcher for index.json, using separate index.json.hash as cache invalidation strategy"""
def __init__(self, url, local_hash, urlopen=web_util.urlopen):
self.url = url
self.local_hash = local_hash
self.urlopen = urlopen
def conditional_fetch(self):
# Do an intermediate fetch for the hash
# and a conditional fetch for the contents
if self.local_hash:
url_index_hash = url_util.join(self.url, _build_cache_relative_path, "index.json.hash")
try:
response = self.urlopen(urllib.request.Request(url_index_hash))
except urllib.error.URLError as e:
raise FetchIndexError("Could not fetch {}".format(url_index_hash), e) from e
# Validate the hash
remote_hash = response.read(64)
if not re.match(rb"[a-f\d]{64}$", remote_hash):
raise FetchIndexError("Invalid hash format in {}".format(url_index_hash))
remote_hash = remote_hash.decode("utf-8")
# No need to update further
if remote_hash == self.local_hash:
return FetchIndexResult(etag=None, hash=None, data=None, fresh=True)
# Otherwise, download index.json
url_index = url_util.join(self.url, _build_cache_relative_path, "index.json")
try:
response = self.urlopen(urllib.request.Request(url_index))
except urllib.error.URLError as e:
raise FetchIndexError("Could not fetch index from {}".format(url_index), e)
try:
result = codecs.getreader("utf-8")(response).read()
except ValueError as e:
return FetchCacheError("Remote index {} is invalid".format(url_index), e)
computed_hash = compute_hash(result)
# We don't handle computed_hash != remote_hash here, which can happen
# when remote index.json and index.json.hash are out of sync, or if
# the hash algorithm changed.
# The most likely scenario is that we got index.json got updated
# while we fetched index.json.hash. Warning about an issue thus feels
# wrong, as it's more of an issue with race conditions in the cache
# invalidation strategy.
# For now we only handle etags on http(s), since 304 error handling
# in s3:// is not there yet.
if urllib.parse.urlparse(self.url).scheme not in ("http", "https"):
etag = None
else:
etag = web_util.parse_etag(
response.headers.get("Etag", None) or response.headers.get("etag", None)
)
return FetchIndexResult(
etag=etag,
hash=computed_hash,
data=result,
fresh=False,
)
class EtagIndexFetcher:
"""Fetcher for index.json, using ETags headers as cache invalidation strategy"""
def __init__(self, url, etag, urlopen=web_util.urlopen):
self.url = url
self.etag = etag
self.urlopen = urlopen
def conditional_fetch(self):
# Just do a conditional fetch immediately
url = url_util.join(self.url, _build_cache_relative_path, "index.json")
headers = {
"User-Agent": web_util.SPACK_USER_AGENT,
"If-None-Match": '"{}"'.format(self.etag),
}
try:
response = self.urlopen(urllib.request.Request(url, headers=headers))
except urllib.error.HTTPError as e:
if e.getcode() == 304:
# Not modified; that means fresh.
return FetchIndexResult(etag=None, hash=None, data=None, fresh=True)
raise FetchIndexError("Could not fetch index {}".format(url), e) from e
except urllib.error.URLError as e:
raise FetchIndexError("Could not fetch index {}".format(url), e) from e
try:
result = codecs.getreader("utf-8")(response).read()
except ValueError as e:
raise FetchIndexError("Remote index {} is invalid".format(url), e) from e
headers = response.headers
etag_header_value = headers.get("Etag", None) or headers.get("etag", None)
return FetchIndexResult(
etag=web_util.parse_etag(etag_header_value),
hash=compute_hash(result),
data=result,
fresh=False,
)

View File

@ -3,9 +3,13 @@
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import glob
import io
import os
import platform
import sys
import urllib.error
import urllib.request
import urllib.response
import py
import pytest
@ -666,3 +670,223 @@ def test_text_relocate_if_needed(install_mockery, mock_fetch, monkeypatch, capfd
assert join_path("bin", "exe") in manifest["text_to_relocate"]
assert join_path("bin", "otherexe") not in manifest["text_to_relocate"]
assert join_path("bin", "secretexe") not in manifest["text_to_relocate"]
def test_etag_fetching_304():
# Test conditional fetch with etags. If the remote hasn't modified the file
# it returns 304, which is an HTTPError in urllib-land. That should be
# handled as success, since it means the local cache is up-to-date.
def response_304(request: urllib.request.Request):
url = request.get_full_url()
if url == "https://www.example.com/build_cache/index.json":
assert request.get_header("If-none-match") == '"112a8bbc1b3f7f185621c1ee335f0502"'
raise urllib.error.HTTPError(
url, 304, "Not Modified", hdrs={}, fp=None # type: ignore[arg-type]
)
assert False, "Should not fetch {}".format(url)
fetcher = bindist.EtagIndexFetcher(
url="https://www.example.com",
etag="112a8bbc1b3f7f185621c1ee335f0502",
urlopen=response_304,
)
result = fetcher.conditional_fetch()
assert isinstance(result, bindist.FetchIndexResult)
assert result.fresh
def test_etag_fetching_200():
# Test conditional fetch with etags. The remote has modified the file.
def response_200(request: urllib.request.Request):
url = request.get_full_url()
if url == "https://www.example.com/build_cache/index.json":
assert request.get_header("If-none-match") == '"112a8bbc1b3f7f185621c1ee335f0502"'
return urllib.response.addinfourl(
io.BytesIO(b"Result"),
headers={"Etag": '"59bcc3ad6775562f845953cf01624225"'}, # type: ignore[arg-type]
url=url,
code=200,
)
assert False, "Should not fetch {}".format(url)
fetcher = bindist.EtagIndexFetcher(
url="https://www.example.com",
etag="112a8bbc1b3f7f185621c1ee335f0502",
urlopen=response_200,
)
result = fetcher.conditional_fetch()
assert isinstance(result, bindist.FetchIndexResult)
assert not result.fresh
assert result.etag == "59bcc3ad6775562f845953cf01624225"
assert result.data == "Result" # decoded utf-8.
assert result.hash == bindist.compute_hash("Result")
def test_etag_fetching_404():
# Test conditional fetch with etags. The remote has modified the file.
def response_404(request: urllib.request.Request):
raise urllib.error.HTTPError(
request.get_full_url(),
404,
"Not found",
hdrs={"Etag": '"59bcc3ad6775562f845953cf01624225"'}, # type: ignore[arg-type]
fp=None,
)
fetcher = bindist.EtagIndexFetcher(
url="https://www.example.com",
etag="112a8bbc1b3f7f185621c1ee335f0502",
urlopen=response_404,
)
with pytest.raises(bindist.FetchIndexError):
fetcher.conditional_fetch()
def test_default_index_fetch_200():
index_json = '{"Hello": "World"}'
index_json_hash = bindist.compute_hash(index_json)
def urlopen(request: urllib.request.Request):
url = request.get_full_url()
if url.endswith("index.json.hash"):
return urllib.response.addinfourl( # type: ignore[arg-type]
io.BytesIO(index_json_hash.encode()),
headers={}, # type: ignore[arg-type]
url=url,
code=200,
)
elif url.endswith("index.json"):
return urllib.response.addinfourl(
io.BytesIO(index_json.encode()),
headers={"Etag": '"59bcc3ad6775562f845953cf01624225"'}, # type: ignore[arg-type]
url=url,
code=200,
)
assert False, "Unexpected request {}".format(url)
fetcher = bindist.DefaultIndexFetcher(
url="https://www.example.com", local_hash="outdated", urlopen=urlopen
)
result = fetcher.conditional_fetch()
assert isinstance(result, bindist.FetchIndexResult)
assert not result.fresh
assert result.etag == "59bcc3ad6775562f845953cf01624225"
assert result.data == index_json
assert result.hash == index_json_hash
def test_default_index_dont_fetch_index_json_hash_if_no_local_hash():
# When we don't have local hash, we should not be fetching the
# remote index.json.hash file, but only index.json.
index_json = '{"Hello": "World"}'
index_json_hash = bindist.compute_hash(index_json)
def urlopen(request: urllib.request.Request):
url = request.get_full_url()
if url.endswith("index.json"):
return urllib.response.addinfourl(
io.BytesIO(index_json.encode()),
headers={"Etag": '"59bcc3ad6775562f845953cf01624225"'}, # type: ignore[arg-type]
url=url,
code=200,
)
assert False, "Unexpected request {}".format(url)
fetcher = bindist.DefaultIndexFetcher(
url="https://www.example.com", local_hash=None, urlopen=urlopen
)
result = fetcher.conditional_fetch()
assert isinstance(result, bindist.FetchIndexResult)
assert result.data == index_json
assert result.hash == index_json_hash
assert result.etag == "59bcc3ad6775562f845953cf01624225"
assert not result.fresh
def test_default_index_not_modified():
index_json = '{"Hello": "World"}'
index_json_hash = bindist.compute_hash(index_json)
def urlopen(request: urllib.request.Request):
url = request.get_full_url()
if url.endswith("index.json.hash"):
return urllib.response.addinfourl(
io.BytesIO(index_json_hash.encode()),
headers={}, # type: ignore[arg-type]
url=url,
code=200,
)
# No request to index.json should be made.
assert False, "Unexpected request {}".format(url)
fetcher = bindist.DefaultIndexFetcher(
url="https://www.example.com", local_hash=index_json_hash, urlopen=urlopen
)
assert fetcher.conditional_fetch().fresh
@pytest.mark.parametrize("index_json", [b"\xa9", b"!#%^"])
def test_default_index_invalid_hash_file(index_json):
# Test invalid unicode / invalid hash type
index_json_hash = bindist.compute_hash(index_json)
def urlopen(request: urllib.request.Request):
return urllib.response.addinfourl(
io.BytesIO(),
headers={}, # type: ignore[arg-type]
url=request.get_full_url(),
code=200,
)
fetcher = bindist.DefaultIndexFetcher(
url="https://www.example.com", local_hash=index_json_hash, urlopen=urlopen
)
with pytest.raises(bindist.FetchIndexError, match="Invalid hash format"):
fetcher.conditional_fetch()
def test_default_index_json_404():
# Test invalid unicode / invalid hash type
index_json = '{"Hello": "World"}'
index_json_hash = bindist.compute_hash(index_json)
def urlopen(request: urllib.request.Request):
url = request.get_full_url()
if url.endswith("index.json.hash"):
return urllib.response.addinfourl(
io.BytesIO(index_json_hash.encode()),
headers={}, # type: ignore[arg-type]
url=url,
code=200,
)
elif url.endswith("index.json"):
raise urllib.error.HTTPError(
url,
code=404,
msg="Not Found",
hdrs={"Etag": '"59bcc3ad6775562f845953cf01624225"'}, # type: ignore[arg-type]
fp=None,
)
assert False, "Unexpected fetch {}".format(url)
fetcher = bindist.DefaultIndexFetcher(
url="https://www.example.com", local_hash="invalid", urlopen=urlopen
)
with pytest.raises(bindist.FetchIndexError, match="Could not fetch index"):
fetcher.conditional_fetch()

View File

@ -182,6 +182,20 @@ def test_get_header():
spack.util.web.get_header(headers, "ContentLength")
def test_etag_parser():
# This follows rfc7232 to some extent, relaxing the quote requirement.
assert spack.util.web.parse_etag('"abcdef"') == "abcdef"
assert spack.util.web.parse_etag("abcdef") == "abcdef"
# No empty tags
assert spack.util.web.parse_etag("") is None
# No quotes or spaces allowed
assert spack.util.web.parse_etag('"abcdef"ghi"') is None
assert spack.util.web.parse_etag('"abc def"') is None
assert spack.util.web.parse_etag("abc def") is None
@pytest.mark.skipif(sys.platform == "win32", reason="Not supported on Windows (yet)")
def test_list_url(tmpdir):
testpath = str(tmpdir)

View File

@ -783,6 +783,36 @@ def unfuzz(header):
raise
def parse_etag(header_value):
"""Parse a strong etag from an ETag: <value> header value.
We don't allow for weakness indicators because it's unclear
what that means for cache invalidation."""
if header_value is None:
return None
# First follow rfc7232 section 2.3 mostly:
# ETag = entity-tag
# entity-tag = [ weak ] opaque-tag
# weak = %x57.2F ; "W/", case-sensitive
# opaque-tag = DQUOTE *etagc DQUOTE
# etagc = %x21 / %x23-7E / obs-text
# ; VCHAR except double quotes, plus obs-text
# obs-text = %x80-FF
# That means quotes are required.
valid = re.match(r'"([\x21\x23-\x7e\x80-\xFF]+)"$', header_value)
if valid:
return valid.group(1)
# However, not everybody adheres to the RFC (some servers send
# wrong etags, but also s3:// is simply a different standard).
# In that case, it's common that quotes are omitted, everything
# else stays the same.
valid = re.match(r"([\x21\x23-\x7e\x80-\xFF]+)$", header_value)
return valid.group(1) if valid else None
class FetchError(spack.error.SpackError):
"""Superclass for fetch-related errors."""