Improve error handling in urlopen / socket read (#48707)

* Backward compat with Python 3.9 for socket.timeout
* Forward compat with Python [unknown] as HTTPResponse.geturl is deprecated
* Catch timeout etc from .read()
* Some minor simplifications: json.load(...) takes file object in binary mode.
* Fix CDash code which does error handling wrong: non-2XX responses raise.
This commit is contained in:
Harmen Stoppels 2025-01-27 16:59:05 +01:00 committed by GitHub
parent b2a75db030
commit c6202842ed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 144 additions and 202 deletions

View File

@ -802,7 +802,7 @@ def url_read_method(url):
try:
_, _, spec_file = web_util.read_from_url(url)
contents = codecs.getreader("utf-8")(spec_file).read()
except web_util.SpackWebError as e:
except (web_util.SpackWebError, OSError) as e:
tty.error(f"Error reading specfile: {url}: {e}")
return contents
@ -2010,7 +2010,7 @@ def fetch_url_to_mirror(url):
# Download the config = spec.json and the relevant tarball
try:
manifest = json.loads(response.read())
manifest = json.load(response)
spec_digest = spack.oci.image.Digest.from_string(manifest["config"]["digest"])
tarball_digest = spack.oci.image.Digest.from_string(
manifest["layers"][-1]["digest"]
@ -2596,11 +2596,14 @@ def try_direct_fetch(spec, mirrors=None):
)
try:
_, _, fs = web_util.read_from_url(buildcache_fetch_url_signed_json)
specfile_contents = codecs.getreader("utf-8")(fs).read()
specfile_is_signed = True
except web_util.SpackWebError as e1:
except (web_util.SpackWebError, OSError) as e1:
try:
_, _, fs = web_util.read_from_url(buildcache_fetch_url_json)
except web_util.SpackWebError as e2:
specfile_contents = codecs.getreader("utf-8")(fs).read()
specfile_is_signed = False
except (web_util.SpackWebError, OSError) as e2:
tty.debug(
f"Did not find {specfile_name} on {buildcache_fetch_url_signed_json}",
e1,
@ -2610,7 +2613,6 @@ def try_direct_fetch(spec, mirrors=None):
f"Did not find {specfile_name} on {buildcache_fetch_url_json}", e2, level=2
)
continue
specfile_contents = codecs.getreader("utf-8")(fs).read()
# read the spec from the build cache file. All specs in build caches
# are concrete (as they are built) so we need to mark this spec
@ -2704,8 +2706,9 @@ def get_keys(install=False, trust=False, force=False, mirrors=None):
try:
_, _, json_file = web_util.read_from_url(keys_index)
json_index = sjson.load(codecs.getreader("utf-8")(json_file))
except web_util.SpackWebError as url_err:
json_index = sjson.load(json_file)
except (web_util.SpackWebError, OSError, ValueError) as url_err:
# TODO: avoid repeated request
if web_util.url_exists(keys_index):
tty.error(
f"Unable to find public keys in {url_util.format(fetch_url)},"
@ -2955,11 +2958,11 @@ def get_remote_hash(self):
url_index_hash = url_util.join(self.url, BUILD_CACHE_RELATIVE_PATH, INDEX_HASH_FILE)
try:
response = self.urlopen(urllib.request.Request(url_index_hash, headers=self.headers))
except (TimeoutError, urllib.error.URLError):
remote_hash = response.read(64)
except OSError:
return None
# Validate the hash
remote_hash = response.read(64)
if not re.match(rb"[a-f\d]{64}$", remote_hash):
return None
return remote_hash.decode("utf-8")
@ -2977,13 +2980,13 @@ def conditional_fetch(self) -> FetchIndexResult:
try:
response = self.urlopen(urllib.request.Request(url_index, headers=self.headers))
except (TimeoutError, urllib.error.URLError) as e:
raise FetchIndexError("Could not fetch index from {}".format(url_index), e) from e
except OSError as e:
raise FetchIndexError(f"Could not fetch index from {url_index}", e) from e
try:
result = codecs.getreader("utf-8")(response).read()
except ValueError as e:
raise FetchIndexError("Remote index {} is invalid".format(url_index), e) from e
except (ValueError, OSError) as e:
raise FetchIndexError(f"Remote index {url_index} is invalid") from e
computed_hash = compute_hash(result)
@ -3027,12 +3030,12 @@ def conditional_fetch(self) -> FetchIndexResult:
# Not modified; that means fresh.
return FetchIndexResult(etag=None, hash=None, data=None, fresh=True)
raise FetchIndexError(f"Could not fetch index {url}", e) from e
except (TimeoutError, urllib.error.URLError) as e:
except OSError as e: # URLError, socket.timeout, etc.
raise FetchIndexError(f"Could not fetch index {url}", e) from e
try:
result = codecs.getreader("utf-8")(response).read()
except ValueError as e:
except (ValueError, OSError) as e:
raise FetchIndexError(f"Remote index {url} is invalid", e) from e
headers = response.headers
@ -3064,11 +3067,11 @@ def conditional_fetch(self) -> FetchIndexResult:
headers={"Accept": "application/vnd.oci.image.manifest.v1+json"},
)
)
except (TimeoutError, urllib.error.URLError) as e:
except OSError as e:
raise FetchIndexError(f"Could not fetch manifest from {url_manifest}", e) from e
try:
manifest = json.loads(response.read())
manifest = json.load(response)
except Exception as e:
raise FetchIndexError(f"Remote index {url_manifest} is invalid", e) from e
@ -3083,14 +3086,16 @@ def conditional_fetch(self) -> FetchIndexResult:
return FetchIndexResult(etag=None, hash=None, data=None, fresh=True)
# Otherwise fetch the blob / index.json
response = self.urlopen(
urllib.request.Request(
url=self.ref.blob_url(index_digest),
headers={"Accept": "application/vnd.oci.image.layer.v1.tar+gzip"},
try:
response = self.urlopen(
urllib.request.Request(
url=self.ref.blob_url(index_digest),
headers={"Accept": "application/vnd.oci.image.layer.v1.tar+gzip"},
)
)
)
result = codecs.getreader("utf-8")(response).read()
result = codecs.getreader("utf-8")(response).read()
except (OSError, ValueError) as e:
raise FetchIndexError(f"Remote index {url_manifest} is invalid", e) from e
# Make sure the blob we download has the advertised hash
if compute_hash(result) != index_digest.digest:

View File

@ -14,7 +14,6 @@
import zipfile
from collections import namedtuple
from typing import Callable, Dict, List, Set
from urllib.error import HTTPError, URLError
from urllib.request import HTTPHandler, Request, build_opener
import llnl.util.filesystem as fs
@ -472,12 +471,9 @@ def generate_pipeline(env: ev.Environment, args) -> None:
# Use all unpruned specs to populate the build group for this set
cdash_config = cfg.get("cdash")
if options.cdash_handler and options.cdash_handler.auth_token:
try:
options.cdash_handler.populate_buildgroup(
[options.cdash_handler.build_name(s) for s in pipeline_specs]
)
except (SpackError, HTTPError, URLError, TimeoutError) as err:
tty.warn(f"Problem populating buildgroup: {err}")
options.cdash_handler.populate_buildgroup(
[options.cdash_handler.build_name(s) for s in pipeline_specs]
)
elif cdash_config:
# warn only if there was actually a CDash configuration.
tty.warn("Unable to populate buildgroup without CDash credentials")

View File

@ -1,23 +1,21 @@
# Copyright Spack Project Developers. See COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import codecs
import copy
import json
import os
import re
import ssl
import sys
import time
from collections import deque
from enum import Enum
from typing import Dict, Generator, List, Optional, Set, Tuple
from urllib.parse import quote, urlencode, urlparse
from urllib.request import HTTPHandler, HTTPSHandler, Request, build_opener
from urllib.request import Request
import llnl.util.filesystem as fs
import llnl.util.tty as tty
from llnl.util.lang import Singleton, memoized
from llnl.util.lang import memoized
import spack.binary_distribution as bindist
import spack.config as cfg
@ -35,32 +33,11 @@
from spack.reporters.cdash import SPACK_CDASH_TIMEOUT
from spack.reporters.cdash import build_stamp as cdash_build_stamp
def _urlopen():
error_handler = web_util.SpackHTTPDefaultErrorHandler()
# One opener with HTTPS ssl enabled
with_ssl = build_opener(
HTTPHandler(), HTTPSHandler(context=web_util.ssl_create_default_context()), error_handler
)
# One opener with HTTPS ssl disabled
without_ssl = build_opener(
HTTPHandler(), HTTPSHandler(context=ssl._create_unverified_context()), error_handler
)
# And dynamically dispatch based on the config:verify_ssl.
def dispatch_open(fullurl, data=None, timeout=None, verify_ssl=True):
opener = with_ssl if verify_ssl else without_ssl
timeout = timeout or cfg.get("config:connect_timeout", 1)
return opener.open(fullurl, data, timeout)
return dispatch_open
IS_WINDOWS = sys.platform == "win32"
SPACK_RESERVED_TAGS = ["public", "protected", "notary"]
_dyn_mapping_urlopener = Singleton(_urlopen)
# this exists purely for testing purposes
_urlopen = web_util.urlopen
def copy_files_to_artifacts(src, artifacts_dir):
@ -279,26 +256,25 @@ def copy_test_results(self, source, dest):
reports = fs.join_path(source, "*_Test*.xml")
copy_files_to_artifacts(reports, dest)
def create_buildgroup(self, opener, headers, url, group_name, group_type):
def create_buildgroup(self, headers, url, group_name, group_type):
data = {"newbuildgroup": group_name, "project": self.project, "type": group_type}
enc_data = json.dumps(data).encode("utf-8")
request = Request(url, data=enc_data, headers=headers)
response = opener.open(request, timeout=SPACK_CDASH_TIMEOUT)
response_code = response.getcode()
if response_code not in [200, 201]:
msg = f"Creating buildgroup failed (response code = {response_code})"
tty.warn(msg)
try:
response_text = _urlopen(request, timeout=SPACK_CDASH_TIMEOUT).read()
except OSError as e:
tty.warn(f"Failed to create CDash buildgroup: {e}")
return None
response_text = response.read()
response_json = json.loads(response_text)
build_group_id = response_json["id"]
return build_group_id
try:
response_json = json.loads(response_text)
return response_json["id"]
except (json.JSONDecodeError, KeyError) as e:
tty.warn(f"Failed to parse CDash response: {e}")
return None
def populate_buildgroup(self, job_names):
url = f"{self.url}/api/v1/buildgroup.php"
@ -308,16 +284,11 @@ def populate_buildgroup(self, job_names):
"Content-Type": "application/json",
}
opener = build_opener(HTTPHandler)
parent_group_id = self.create_buildgroup(opener, headers, url, self.build_group, "Daily")
group_id = self.create_buildgroup(
opener, headers, url, f"Latest {self.build_group}", "Latest"
)
parent_group_id = self.create_buildgroup(headers, url, self.build_group, "Daily")
group_id = self.create_buildgroup(headers, url, f"Latest {self.build_group}", "Latest")
if not parent_group_id or not group_id:
msg = f"Failed to create or retrieve buildgroups for {self.build_group}"
tty.warn(msg)
tty.warn(f"Failed to create or retrieve buildgroups for {self.build_group}")
return
data = {
@ -329,15 +300,12 @@ def populate_buildgroup(self, job_names):
enc_data = json.dumps(data).encode("utf-8")
request = Request(url, data=enc_data, headers=headers)
request.get_method = lambda: "PUT"
request = Request(url, data=enc_data, headers=headers, method="PUT")
response = opener.open(request, timeout=SPACK_CDASH_TIMEOUT)
response_code = response.getcode()
if response_code != 200:
msg = f"Error response code ({response_code}) in populate_buildgroup"
tty.warn(msg)
try:
_urlopen(request, timeout=SPACK_CDASH_TIMEOUT)
except OSError as e:
tty.warn(f"Failed to populate CDash buildgroup: {e}")
def report_skipped(self, spec: spack.spec.Spec, report_dir: str, reason: Optional[str]):
"""Explicitly report skipping testing of a spec (e.g., it's CI
@ -735,9 +703,6 @@ def _apply_section(dest, src):
for value in header.values():
value = os.path.expandvars(value)
verify_ssl = mapping.get("verify_ssl", spack.config.get("config:verify_ssl", True))
timeout = mapping.get("timeout", spack.config.get("config:connect_timeout", 1))
required = mapping.get("require", [])
allowed = mapping.get("allow", [])
ignored = mapping.get("ignore", [])
@ -771,19 +736,15 @@ def job_query(job):
endpoint_url._replace(query=query).geturl(), headers=header, method="GET"
)
try:
response = _dyn_mapping_urlopener(
request, verify_ssl=verify_ssl, timeout=timeout
)
response = _urlopen(request)
config = json.load(response)
except Exception as e:
# For now just ignore any errors from dynamic mapping and continue
# This is still experimental, and failures should not stop CI
# from running normally
tty.warn(f"Failed to fetch dynamic mapping for query:\n\t{query}")
tty.warn(f"{e}")
tty.warn(f"Failed to fetch dynamic mapping for query:\n\t{query}: {e}")
continue
config = json.load(codecs.getreader("utf-8")(response))
# Strip ignore keys
if ignored:
for key in ignored:

View File

@ -321,9 +321,15 @@ def _fetch_urllib(self, url):
request = urllib.request.Request(url, headers={"User-Agent": web_util.SPACK_USER_AGENT})
if os.path.lexists(save_file):
os.remove(save_file)
try:
response = web_util.urlopen(request)
except (TimeoutError, urllib.error.URLError) as e:
tty.msg(f"Fetching {url}")
with open(save_file, "wb") as f:
shutil.copyfileobj(response, f)
except OSError as e:
# clean up archive on failure.
if self.archive_file:
os.remove(self.archive_file)
@ -331,14 +337,6 @@ def _fetch_urllib(self, url):
os.remove(save_file)
raise FailedDownloadError(e) from e
tty.msg(f"Fetching {url}")
if os.path.lexists(save_file):
os.remove(save_file)
with open(save_file, "wb") as f:
shutil.copyfileobj(response, f)
# Save the redirected URL for error messages. Sometimes we're redirected to an arbitrary
# mirror that is broken, leading to spurious download failures. In that case it's helpful
# for users to know which URL was actually fetched.
@ -535,11 +533,16 @@ def __init__(self, *, url: str, checksum: Optional[str] = None, **kwargs):
@_needs_stage
def fetch(self):
file = self.stage.save_filename
tty.msg(f"Fetching {self.url}")
if os.path.lexists(file):
os.remove(file)
try:
response = self._urlopen(self.url)
except (TimeoutError, urllib.error.URLError) as e:
tty.msg(f"Fetching {self.url}")
with open(file, "wb") as f:
shutil.copyfileobj(response, f)
except OSError as e:
# clean up archive on failure.
if self.archive_file:
os.remove(self.archive_file)
@ -547,12 +550,6 @@ def fetch(self):
os.remove(file)
raise FailedDownloadError(e) from e
if os.path.lexists(file):
os.remove(file)
with open(file, "wb") as f:
shutil.copyfileobj(response, f)
class VCSFetchStrategy(FetchStrategy):
"""Superclass for version control system fetch strategies.

View File

@ -7,6 +7,7 @@
import base64
import json
import re
import socket
import time
import urllib.error
import urllib.parse
@ -410,7 +411,7 @@ def wrapper(*args, **kwargs):
for i in range(retries):
try:
return f(*args, **kwargs)
except (urllib.error.URLError, TimeoutError) as e:
except OSError as e:
# Retry on internal server errors, and rate limit errors
# Potentially this could take into account the Retry-After header
# if registries support it
@ -420,9 +421,10 @@ def wrapper(*args, **kwargs):
and (500 <= e.code < 600 or e.code == 429)
)
or (
isinstance(e, urllib.error.URLError) and isinstance(e.reason, TimeoutError)
isinstance(e, urllib.error.URLError)
and isinstance(e.reason, socket.timeout)
)
or isinstance(e, TimeoutError)
or isinstance(e, socket.timeout)
):
# Exponential backoff
sleep(2**i)

View File

@ -5,7 +5,6 @@
import os
import pathlib
import shutil
from io import BytesIO
from typing import NamedTuple
import jsonschema
@ -32,6 +31,7 @@
from spack.schema.buildcache_spec import schema as specfile_schema
from spack.schema.database_index import schema as db_idx_schema
from spack.spec import Spec
from spack.test.conftest import MockHTTPResponse
config_cmd = spack.main.SpackCommand("config")
ci_cmd = spack.main.SpackCommand("ci")
@ -239,7 +239,7 @@ def test_ci_generate_with_cdash_token(ci_generate_test, tmp_path, mock_binary_in
# That fake token should have resulted in being unable to
# register build group with cdash, but the workload should
# still have been generated.
assert "Problem populating buildgroup" in output
assert "Failed to create or retrieve buildgroups" in output
expected_keys = ["rebuild-index", "stages", "variables", "workflow"]
assert all([key in yaml_contents.keys() for key in expected_keys])
@ -1548,10 +1548,10 @@ def test_ci_dynamic_mapping_empty(
ci_base_environment,
):
# The test will always return an empty dictionary
def fake_dyn_mapping_urlopener(*args, **kwargs):
return BytesIO("{}".encode())
def _urlopen(*args, **kwargs):
return MockHTTPResponse.with_json(200, "OK", headers={}, body={})
monkeypatch.setattr(ci.common, "_dyn_mapping_urlopener", fake_dyn_mapping_urlopener)
monkeypatch.setattr(ci.common, "_urlopen", _urlopen)
_ = dynamic_mapping_setup(tmpdir)
with tmpdir.as_cwd():
@ -1572,15 +1572,15 @@ def test_ci_dynamic_mapping_full(
monkeypatch,
ci_base_environment,
):
# The test will always return an empty dictionary
def fake_dyn_mapping_urlopener(*args, **kwargs):
return BytesIO(
json.dumps(
{"variables": {"MY_VAR": "hello"}, "ignored_field": 0, "unallowed_field": 0}
).encode()
def _urlopen(*args, **kwargs):
return MockHTTPResponse.with_json(
200,
"OK",
headers={},
body={"variables": {"MY_VAR": "hello"}, "ignored_field": 0, "unallowed_field": 0},
)
monkeypatch.setattr(ci.common, "_dyn_mapping_urlopener", fake_dyn_mapping_urlopener)
monkeypatch.setattr(ci.common, "_urlopen", _urlopen)
label = dynamic_mapping_setup(tmpdir)
with tmpdir.as_cwd():

View File

@ -4,9 +4,11 @@
import collections
import datetime
import email.message
import errno
import functools
import inspect
import io
import itertools
import json
import os
@ -2128,3 +2130,46 @@ def mock_test_cache(tmp_path_factory):
cache_dir = tmp_path_factory.mktemp("cache")
print(cache_dir)
return spack.util.file_cache.FileCache(str(cache_dir))
class MockHTTPResponse(io.IOBase):
"""This is a mock HTTP response, which implements part of http.client.HTTPResponse"""
def __init__(self, status, reason, headers=None, body=None):
self.msg = None
self.version = 11
self.url = None
self.headers = email.message.EmailMessage()
self.status = status
self.code = status
self.reason = reason
self.debuglevel = 0
self._body = body
if headers is not None:
for key, value in headers.items():
self.headers[key] = value
@classmethod
def with_json(cls, status, reason, headers=None, body=None):
"""Create a mock HTTP response with JSON string as body"""
body = io.BytesIO(json.dumps(body).encode("utf-8"))
return cls(status, reason, headers, body)
def read(self, *args, **kwargs):
return self._body.read(*args, **kwargs)
def getheader(self, name, default=None):
self.headers.get(name, default)
def getheaders(self):
return self.headers.items()
def fileno(self):
return 0
def getcode(self):
return self.status
def info(self):
return self.headers

View File

@ -4,7 +4,6 @@
import base64
import email.message
import hashlib
import io
import json
@ -19,49 +18,7 @@
import spack.oci.oci
from spack.oci.image import Digest
from spack.oci.opener import OCIAuthHandler
class MockHTTPResponse(io.IOBase):
"""This is a mock HTTP response, which implements part of http.client.HTTPResponse"""
def __init__(self, status, reason, headers=None, body=None):
self.msg = None
self.version = 11
self.url = None
self.headers = email.message.EmailMessage()
self.status = status
self.code = status
self.reason = reason
self.debuglevel = 0
self._body = body
if headers is not None:
for key, value in headers.items():
self.headers[key] = value
@classmethod
def with_json(cls, status, reason, headers=None, body=None):
"""Create a mock HTTP response with JSON string as body"""
body = io.BytesIO(json.dumps(body).encode("utf-8"))
return cls(status, reason, headers, body)
def read(self, *args, **kwargs):
return self._body.read(*args, **kwargs)
def getheader(self, name, default=None):
self.headers.get(name, default)
def getheaders(self):
return self.headers.items()
def fileno(self):
return 0
def getcode(self):
return self.status
def info(self):
return self.headers
from spack.test.conftest import MockHTTPResponse
class MiddlewareError(Exception):

View File

@ -32,6 +32,7 @@
get_bearer_challenge,
parse_www_authenticate,
)
from spack.test.conftest import MockHTTPResponse
from spack.test.oci.mock_registry import (
DummyServer,
DummyServerUrllibHandler,
@ -39,7 +40,6 @@
InMemoryOCIRegistryWithAuth,
MiddlewareError,
MockBearerTokenServer,
MockHTTPResponse,
create_opener,
)

View File

@ -354,21 +354,6 @@ def test_url_missing_curl(mutable_config, missing_curl, monkeypatch):
web_util.url_exists("https://example.com/")
def test_url_fetch_text_urllib_bad_returncode(mutable_config, monkeypatch):
class response:
def getcode(self):
return 404
def _read_from_url(*args, **kwargs):
return None, None, response()
monkeypatch.setattr(web_util, "read_from_url", _read_from_url)
mutable_config.set("config:url_fetch_method", "urllib")
with pytest.raises(spack.error.FetchError, match="failed with error code"):
web_util.fetch_url_text("https://example.com/")
def test_url_fetch_text_urllib_web_error(mutable_config, monkeypatch):
def _raise_web_error(*args, **kwargs):
raise web_util.SpackWebError("bad url")
@ -376,5 +361,5 @@ def _raise_web_error(*args, **kwargs):
monkeypatch.setattr(web_util, "read_from_url", _raise_web_error)
mutable_config.set("config:url_fetch_method", "urllib")
with pytest.raises(spack.error.FetchError, match="fetch failed to verify"):
with pytest.raises(spack.error.FetchError, match="fetch failed"):
web_util.fetch_url_text("https://example.com/")

View File

@ -209,7 +209,7 @@ def read_from_url(url, accept_content_type=None):
try:
response = urlopen(request)
except (TimeoutError, URLError) as e:
except OSError as e:
raise SpackWebError(f"Download of {url.geturl()} failed: {e.__class__.__name__}: {e}")
if accept_content_type:
@ -227,7 +227,7 @@ def read_from_url(url, accept_content_type=None):
tty.debug(msg)
return None, None, None
return response.geturl(), response.headers, response
return response.url, response.headers, response
def push_to_url(local_file_path, remote_path, keep_original=True, extra_args=None):
@ -405,12 +405,6 @@ def fetch_url_text(url, curl: Optional[Executable] = None, dest_dir="."):
try:
_, _, response = read_from_url(url)
returncode = response.getcode()
if returncode and returncode != 200:
raise spack.error.FetchError(
"Urllib failed with error code {0}".format(returncode)
)
output = codecs.getreader("utf-8")(response).read()
if output:
with working_dir(dest_dir, create=True):
@ -419,8 +413,8 @@ def fetch_url_text(url, curl: Optional[Executable] = None, dest_dir="."):
return path
except SpackWebError as err:
raise spack.error.FetchError("Urllib fetch failed to verify url: {0}".format(str(err)))
except (SpackWebError, OSError, ValueError) as err:
raise spack.error.FetchError(f"Urllib fetch failed: {err}")
return None
@ -464,7 +458,7 @@ def url_exists(url, curl=None):
timeout=spack.config.get("config:connect_timeout", 10),
)
return True
except (TimeoutError, URLError) as e:
except OSError as e:
tty.debug(f"Failure reading {url}: {e}")
return False
@ -746,7 +740,7 @@ def _spider(url: urllib.parse.ParseResult, collect_nested: bool, _visited: Set[s
subcalls.append(abs_link)
_visited.add(abs_link)
except (TimeoutError, URLError) as e:
except OSError as e:
tty.debug(f"[SPIDER] Unable to read: {url}")
tty.debug(str(e), level=2)
if isinstance(e, URLError) and isinstance(e.reason, ssl.SSLError):