binary_distribution: Speed up buildcache update-index (#32796)

This change uses the aws cli, if available, to retrieve spec files
from the mirror to a local temp directory, then parallelizes the
reading of those files from disk using multiprocessing.ThreadPool.

If the aws cli is not available, then a ThreadPool is used to fetch
and read the spec files from the mirror.

Using aws cli results in ~16 times speed up to recreate the binary
mirror index, while just parallelizing the fetching and reading
results in ~3 speed up.
This commit is contained in:
Scott Wittenburg 2022-11-07 13:31:14 -07:00 committed by GitHub
parent 01a5788517
commit 28a77c2821
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -7,6 +7,7 @@
import collections import collections
import hashlib import hashlib
import json import json
import multiprocessing.pool
import os import os
import shutil import shutil
import sys import sys
@ -45,6 +46,7 @@
from spack.relocate import utf8_paths_to_single_binary_regex from spack.relocate import utf8_paths_to_single_binary_regex
from spack.spec import Spec from spack.spec import Spec
from spack.stage import Stage from spack.stage import Stage
from spack.util.executable import which
_build_cache_relative_path = "build_cache" _build_cache_relative_path = "build_cache"
_build_cache_keys_relative_path = "_pgp" _build_cache_keys_relative_path = "_pgp"
@ -72,6 +74,10 @@ def __init__(self, errors):
super(FetchCacheError, self).__init__(self.message) super(FetchCacheError, self).__init__(self.message)
class ListMirrorSpecsError(spack.error.SpackError):
"""Raised when unable to retrieve list of specs from the mirror"""
class BinaryCacheIndex(object): class BinaryCacheIndex(object):
""" """
The BinaryCacheIndex tracks what specs are available on (usually remote) The BinaryCacheIndex tracks what specs are available on (usually remote)
@ -881,37 +887,52 @@ def sign_specfile(key, force, specfile_path):
spack.util.gpg.sign(key, specfile_path, signed_specfile_path, clearsign=True) spack.util.gpg.sign(key, specfile_path, signed_specfile_path, clearsign=True)
def _fetch_spec_from_mirror(spec_url): def _read_specs_and_push_index(file_list, read_method, cache_prefix, db, temp_dir, concurrency):
s = None """Read all the specs listed in the provided list, using thread given thread parallelism,
tty.debug("fetching {0}".format(spec_url)) generate the index, and push it to the mirror.
_, _, spec_file = web_util.read_from_url(spec_url)
spec_file_contents = codecs.getreader("utf-8")(spec_file).read()
# Need full spec.json name or this gets confused with index.json.
if spec_url.endswith(".json.sig"):
specfile_json = Spec.extract_json_from_clearsig(spec_file_contents)
s = Spec.from_dict(specfile_json)
elif spec_url.endswith(".json"):
s = Spec.from_json(spec_file_contents)
elif spec_url.endswith(".yaml"):
s = Spec.from_yaml(spec_file_contents)
return s
Args:
file_list (list(str)): List of urls or file paths pointing at spec files to read
read_method: A function taking a single argument, either a url or a file path,
and which reads the spec file at that location, and returns the spec.
cache_prefix (str): prefix of the build cache on s3 where index should be pushed.
db: A spack database used for adding specs and then writing the index.
temp_dir (str): Location to write index.json and hash for pushing
concurrency (int): Number of parallel processes to use when fetching
def _read_specs_and_push_index(file_list, cache_prefix, db, db_root_dir): Return:
for file_path in file_list: None
try: """
s = _fetch_spec_from_mirror(url_util.join(cache_prefix, file_path))
except (URLError, web_util.SpackWebError) as url_err:
tty.error("Error reading specfile: {0}".format(file_path))
tty.error(url_err)
if s: def _fetch_spec_from_mirror(spec_url):
db.add(s, None) spec_file_contents = read_method(spec_url)
db.mark(s, "in_buildcache", True)
if spec_file_contents:
# Need full spec.json name or this gets confused with index.json.
if spec_url.endswith(".json.sig"):
specfile_json = Spec.extract_json_from_clearsig(spec_file_contents)
return Spec.from_dict(specfile_json)
if spec_url.endswith(".json"):
return Spec.from_json(spec_file_contents)
if spec_url.endswith(".yaml"):
return Spec.from_yaml(spec_file_contents)
tp = multiprocessing.pool.ThreadPool(processes=concurrency)
try:
fetched_specs = tp.map(
llnl.util.lang.star(_fetch_spec_from_mirror), [(f,) for f in file_list]
)
finally:
tp.terminate()
tp.join()
for fetched_spec in fetched_specs:
db.add(fetched_spec, None)
db.mark(fetched_spec, "in_buildcache", True)
# Now generate the index, compute its hash, and push the two files to # Now generate the index, compute its hash, and push the two files to
# the mirror. # the mirror.
index_json_path = os.path.join(db_root_dir, "index.json") index_json_path = os.path.join(temp_dir, "index.json")
with open(index_json_path, "w") as f: with open(index_json_path, "w") as f:
db._write_to_file(f) db._write_to_file(f)
@ -921,7 +942,7 @@ def _read_specs_and_push_index(file_list, cache_prefix, db, db_root_dir):
index_hash = compute_hash(index_string) index_hash = compute_hash(index_string)
# Write the hash out to a local file # Write the hash out to a local file
index_hash_path = os.path.join(db_root_dir, "index.json.hash") index_hash_path = os.path.join(temp_dir, "index.json.hash")
with open(index_hash_path, "w") as f: with open(index_hash_path, "w") as f:
f.write(index_hash) f.write(index_hash)
@ -942,31 +963,142 @@ def _read_specs_and_push_index(file_list, cache_prefix, db, db_root_dir):
) )
def generate_package_index(cache_prefix): def _specs_from_cache_aws_cli(cache_prefix):
"""Create the build cache index page. """Use aws cli to sync all the specs into a local temporary directory.
Creates (or replaces) the "index.json" page at the location given in Args:
cache_prefix. This page contains a link for each binary package (.yaml or cache_prefix (str): prefix of the build cache on s3
.json) under cache_prefix.
Return:
List of the local file paths and a function that can read each one from the file system.
""" """
read_fn = None
file_list = None
aws = which("aws")
def file_read_method(file_path):
with open(file_path) as fd:
return fd.read()
tmpspecsdir = tempfile.mkdtemp()
sync_command_args = [
"s3",
"sync",
"--exclude",
"*",
"--include",
"*.spec.json.sig",
"--include",
"*.spec.json",
"--include",
"*.spec.yaml",
cache_prefix,
tmpspecsdir,
]
try:
tty.debug(
"Using aws s3 sync to download specs from {0} to {1}".format(cache_prefix, tmpspecsdir)
)
aws(*sync_command_args, output=os.devnull, error=os.devnull)
file_list = fsys.find(tmpspecsdir, ["*.spec.json.sig", "*.spec.json", "*.spec.yaml"])
read_fn = file_read_method
except Exception:
tty.warn("Failed to use aws s3 sync to retrieve specs, falling back to parallel fetch")
shutil.rmtree(tmpspecsdir)
return file_list, read_fn
def _specs_from_cache_fallback(cache_prefix):
"""Use spack.util.web module to get a list of all the specs at the remote url.
Args:
cache_prefix (str): Base url of mirror (location of spec files)
Return:
The list of complete spec file urls and a function that can read each one from its
remote location (also using the spack.util.web module).
"""
read_fn = None
file_list = None
def url_read_method(url):
contents = None
try:
_, _, spec_file = web_util.read_from_url(url)
contents = codecs.getreader("utf-8")(spec_file).read()
except (URLError, web_util.SpackWebError) as url_err:
tty.error("Error reading specfile: {0}".format(url))
tty.error(url_err)
return contents
try: try:
file_list = [ file_list = [
entry url_util.join(cache_prefix, entry)
for entry in web_util.list_url(cache_prefix) for entry in web_util.list_url(cache_prefix)
if entry.endswith(".yaml") if entry.endswith(".yaml")
or entry.endswith("spec.json") or entry.endswith("spec.json")
or entry.endswith("spec.json.sig") or entry.endswith("spec.json.sig")
] ]
read_fn = url_read_method
except KeyError as inst: except KeyError as inst:
msg = "No packages at {0}: {1}".format(cache_prefix, inst) msg = "No packages at {0}: {1}".format(cache_prefix, inst)
tty.warn(msg) tty.warn(msg)
return
except Exception as err: except Exception as err:
# If we got some kind of S3 (access denied or other connection # If we got some kind of S3 (access denied or other connection
# error), the first non boto-specific class in the exception # error), the first non boto-specific class in the exception
# hierarchy is Exception. Just print a warning and return # hierarchy is Exception. Just print a warning and return
msg = "Encountered problem listing packages at {0}: {1}".format(cache_prefix, err) msg = "Encountered problem listing packages at {0}: {1}".format(cache_prefix, err)
tty.warn(msg) tty.warn(msg)
return file_list, read_fn
def _spec_files_from_cache(cache_prefix):
"""Get a list of all the spec files in the mirror and a function to
read them.
Args:
cache_prefix (str): Base url of mirror (location of spec files)
Return:
A tuple where the first item is a list of absolute file paths or
urls pointing to the specs that should be read from the mirror,
and the second item is a function taking a url or file path and
returning the spec read from that location.
"""
callbacks = []
if cache_prefix.startswith("s3"):
callbacks.append(_specs_from_cache_aws_cli)
callbacks.append(_specs_from_cache_fallback)
for specs_from_cache_fn in callbacks:
file_list, read_fn = specs_from_cache_fn(cache_prefix)
if file_list:
return file_list, read_fn
raise ListMirrorSpecsError("Failed to get list of specs from {0}".format(cache_prefix))
def generate_package_index(cache_prefix, concurrency=32):
"""Create or replace the build cache index on the given mirror. The
buildcache index contains an entry for each binary package under the
cache_prefix.
Args:
cache_prefix(str): Base url of binary mirror.
concurrency: (int): The desired threading concurrency to use when
fetching the spec files from the mirror.
Return:
None
"""
try:
file_list, read_fn = _spec_files_from_cache(cache_prefix)
except ListMirrorSpecsError as err:
tty.error("Unabled to generate package index, {0}".format(err))
return return
if any(x.endswith(".yaml") for x in file_list): if any(x.endswith(".yaml") for x in file_list):
@ -989,7 +1121,7 @@ def generate_package_index(cache_prefix):
) )
try: try:
_read_specs_and_push_index(file_list, cache_prefix, db, db_root_dir) _read_specs_and_push_index(file_list, read_fn, cache_prefix, db, db_root_dir, concurrency)
except Exception as err: except Exception as err:
msg = "Encountered problem pushing package index to {0}: {1}".format(cache_prefix, err) msg = "Encountered problem pushing package index to {0}: {1}".format(cache_prefix, err)
tty.warn(msg) tty.warn(msg)