gitlab ci: reorganize when we check for specs on mirrors (#38626)

Move the logic checking which mirrors have the specs we need closer
to where that information is needed.  Also update the staging summary
to contain a brief description of why we scheduled or pruned each
job.  If a spec was found on any mirrors, regardless of whether
we scheduled a job for it, print those mirrors.
This commit is contained in:
Scott Wittenburg 2023-06-30 10:18:16 -06:00 committed by GitHub
parent dc25da1931
commit af5b93bb97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -139,49 +139,33 @@ def _add_dependency(spec_label, dep_label, deps):
deps[spec_label].add(dep_label) deps[spec_label].add(dep_label)
def _get_spec_dependencies( def _get_spec_dependencies(specs, deps, spec_labels):
specs, deps, spec_labels, check_index_only=False, mirrors_to_check=None spec_deps_obj = _compute_spec_deps(specs)
):
spec_deps_obj = _compute_spec_deps(
specs, check_index_only=check_index_only, mirrors_to_check=mirrors_to_check
)
if spec_deps_obj: if spec_deps_obj:
dependencies = spec_deps_obj["dependencies"] dependencies = spec_deps_obj["dependencies"]
specs = spec_deps_obj["specs"] specs = spec_deps_obj["specs"]
for entry in specs: for entry in specs:
spec_labels[entry["label"]] = { spec_labels[entry["label"]] = entry["spec"]
"spec": entry["spec"],
"needs_rebuild": entry["needs_rebuild"],
}
for entry in dependencies: for entry in dependencies:
_add_dependency(entry["spec"], entry["depends"], deps) _add_dependency(entry["spec"], entry["depends"], deps)
def stage_spec_jobs(specs, check_index_only=False, mirrors_to_check=None): def stage_spec_jobs(specs):
"""Take a set of release specs and generate a list of "stages", where the """Take a set of release specs and generate a list of "stages", where the
jobs in any stage are dependent only on jobs in previous stages. This jobs in any stage are dependent only on jobs in previous stages. This
allows us to maximize build parallelism within the gitlab-ci framework. allows us to maximize build parallelism within the gitlab-ci framework.
Arguments: Arguments:
specs (Iterable): Specs to build specs (Iterable): Specs to build
check_index_only (bool): Regardless of whether DAG pruning is enabled,
all configured mirrors are searched to see if binaries for specs
are up to date on those mirrors. This flag limits that search to
the binary cache indices on those mirrors to speed the process up,
even though there is no garantee the index is up to date.
mirrors_to_checK: Optional mapping giving mirrors to check instead of
any configured mirrors.
Returns: A tuple of information objects describing the specs, dependencies Returns: A tuple of information objects describing the specs, dependencies
and stages: and stages:
spec_labels: A dictionary mapping the spec labels which are made of spec_labels: A dictionary mapping the spec labels (which are formatted
(pkg-name/hash-prefix), to objects containing "spec" and "needs_rebuild" as pkg-name/hash-prefix) to concrete specs.
keys. The root spec is the spec of which this spec is a dependency
and the spec is the formatted spec string for this spec.
deps: A dictionary where the keys should also have appeared as keys in deps: A dictionary where the keys should also have appeared as keys in
the spec_labels dictionary, and the values are the set of the spec_labels dictionary, and the values are the set of
@ -210,13 +194,7 @@ def _remove_satisfied_deps(deps, satisfied_list):
deps = {} deps = {}
spec_labels = {} spec_labels = {}
_get_spec_dependencies( _get_spec_dependencies(specs, deps, spec_labels)
specs,
deps,
spec_labels,
check_index_only=check_index_only,
mirrors_to_check=mirrors_to_check,
)
# Save the original deps, as we need to return them at the end of the # Save the original deps, as we need to return them at the end of the
# function. In the while loop below, the "dependencies" variable is # function. In the while loop below, the "dependencies" variable is
@ -242,24 +220,36 @@ def _remove_satisfied_deps(deps, satisfied_list):
return spec_labels, deps, stages return spec_labels, deps, stages
def _print_staging_summary(spec_labels, dependencies, stages): def _print_staging_summary(spec_labels, stages, mirrors_to_check, rebuild_decisions):
if not stages: if not stages:
return return
tty.msg(" Staging summary ([x] means a job needs rebuilding):") mirrors = spack.mirror.MirrorCollection(mirrors=mirrors_to_check)
tty.msg("Checked the following mirrors for binaries:")
for m in mirrors.values():
tty.msg(" {0}".format(m.fetch_url))
tty.msg("Staging summary ([x] means a job needs rebuilding):")
for stage_index, stage in enumerate(stages): for stage_index, stage in enumerate(stages):
tty.msg(" stage {0} ({1} jobs):".format(stage_index, len(stage))) tty.msg(" stage {0} ({1} jobs):".format(stage_index, len(stage)))
for job in sorted(stage): for job in sorted(stage):
s = spec_labels[job]["spec"] s = spec_labels[job]
rebuild = rebuild_decisions[job].rebuild
reason = rebuild_decisions[job].reason
reason_msg = " ({0})".format(reason) if reason else ""
tty.msg( tty.msg(
" [{1}] {0} -> {2}".format( " [{1}] {0} -> {2}{3}".format(
job, "x" if spec_labels[job]["needs_rebuild"] else " ", _get_spec_string(s) job, "x" if rebuild else " ", _get_spec_string(s), reason_msg
) )
) )
if rebuild_decisions[job].mirrors:
tty.msg(" found on the following mirrors:")
for murl in rebuild_decisions[job].mirrors:
tty.msg(" {0}".format(murl))
def _compute_spec_deps(spec_list, check_index_only=False, mirrors_to_check=None): def _compute_spec_deps(spec_list):
""" """
Computes all the dependencies for the spec(s) and generates a JSON Computes all the dependencies for the spec(s) and generates a JSON
object which provides both a list of unique spec names as well as a object which provides both a list of unique spec names as well as a
@ -323,12 +313,8 @@ def append_dep(s, d):
tty.msg("Will not stage external pkg: {0}".format(s)) tty.msg("Will not stage external pkg: {0}".format(s))
continue continue
up_to_date_mirrors = bindist.get_mirrors_for_spec(
spec=s, mirrors_to_check=mirrors_to_check, index_only=check_index_only
)
skey = _spec_deps_key(s) skey = _spec_deps_key(s)
spec_labels[skey] = {"spec": s, "needs_rebuild": not up_to_date_mirrors} spec_labels[skey] = s
for d in s.dependencies(deptype=all): for d in s.dependencies(deptype=all):
dkey = _spec_deps_key(d) dkey = _spec_deps_key(d)
@ -338,14 +324,8 @@ def append_dep(s, d):
append_dep(skey, dkey) append_dep(skey, dkey)
for spec_label, spec_holder in spec_labels.items(): for spec_label, concrete_spec in spec_labels.items():
specs.append( specs.append({"label": spec_label, "spec": concrete_spec})
{
"label": spec_label,
"spec": spec_holder["spec"],
"needs_rebuild": spec_holder["needs_rebuild"],
}
)
deps_json_obj = {"specs": specs, "dependencies": dependencies} deps_json_obj = {"specs": specs, "dependencies": dependencies}
@ -357,14 +337,14 @@ def _spec_matches(spec, match_string):
def _format_job_needs( def _format_job_needs(
dep_jobs, osname, build_group, prune_dag, stage_spec_dict, enable_artifacts_buildcache dep_jobs, osname, build_group, prune_dag, rebuild_decisions, enable_artifacts_buildcache
): ):
needs_list = [] needs_list = []
for dep_job in dep_jobs: for dep_job in dep_jobs:
dep_spec_key = _spec_deps_key(dep_job) dep_spec_key = _spec_deps_key(dep_job)
dep_spec_info = stage_spec_dict[dep_spec_key] rebuild = rebuild_decisions[dep_spec_key].rebuild
if not prune_dag or dep_spec_info["needs_rebuild"]: if not prune_dag or rebuild:
needs_list.append( needs_list.append(
{ {
"job": get_job_name(dep_job, dep_job.architecture, build_group), "job": get_job_name(dep_job, dep_job.architecture, build_group),
@ -470,8 +450,7 @@ def get_spec_filter_list(env, affected_pkgs, dependent_traverse_depth=None):
def _build_jobs(spec_labels, stages): def _build_jobs(spec_labels, stages):
for stage_jobs in stages: for stage_jobs in stages:
for spec_label in stage_jobs: for spec_label in stage_jobs:
spec_record = spec_labels[spec_label] release_spec = spec_labels[spec_label]
release_spec = spec_record["spec"]
release_spec_dag_hash = release_spec.dag_hash() release_spec_dag_hash = release_spec.dag_hash()
yield release_spec, release_spec_dag_hash yield release_spec, release_spec_dag_hash
@ -492,6 +471,13 @@ def _unpack_script(script_section, op=_noop):
return script return script
class RebuildDecision(object):
def __init__(self):
self.rebuild = True
self.mirrors = []
self.reason = ""
class SpackCI: class SpackCI:
"""Spack CI object used to generate intermediate representation """Spack CI object used to generate intermediate representation
used by the CI generator(s). used by the CI generator(s).
@ -944,23 +930,13 @@ def generate_gitlab_ci_yaml(
except bindist.FetchCacheError as e: except bindist.FetchCacheError as e:
tty.warn(e) tty.warn(e)
try: spec_labels, dependencies, stages = stage_spec_jobs(
concrete_env_specs = [ [
concrete concrete
for abstract, concrete in env.concretized_specs() for abstract, concrete in env.concretized_specs()
if abstract in env.spec_lists["specs"] if abstract in env.spec_lists["specs"]
] ]
spec_labels, dependencies, stages = stage_spec_jobs( )
concrete_env_specs,
check_index_only=check_index_only,
mirrors_to_check=mirrors_to_check,
)
finally:
# Clean up remote mirror override if enabled
if remote_mirror_override:
spack.mirror.remove("ci_pr_mirror", cfg.default_modify_scope())
if spack_pipeline_type == "spack_pull_request":
spack.mirror.remove("ci_shared_pr_mirror", cfg.default_modify_scope())
all_job_names = [] all_job_names = []
output_object = {} output_object = {}
@ -986,26 +962,37 @@ def generate_gitlab_ci_yaml(
spack_ci = SpackCI(ci_config, spec_labels, stages) spack_ci = SpackCI(ci_config, spec_labels, stages)
spack_ci_ir = spack_ci.generate_ir() spack_ci_ir = spack_ci.generate_ir()
rebuild_decisions = {}
for stage_jobs in stages: for stage_jobs in stages:
stage_name = "stage-{0}".format(stage_id) stage_name = "stage-{0}".format(stage_id)
stage_names.append(stage_name) stage_names.append(stage_name)
stage_id += 1 stage_id += 1
for spec_label in stage_jobs: for spec_label in stage_jobs:
spec_record = spec_labels[spec_label] release_spec = spec_labels[spec_label]
release_spec = spec_record["spec"]
release_spec_dag_hash = release_spec.dag_hash() release_spec_dag_hash = release_spec.dag_hash()
spec_record = RebuildDecision()
rebuild_decisions[spec_label] = spec_record
if prune_untouched_packages: if prune_untouched_packages:
if release_spec not in affected_specs: if release_spec not in affected_specs:
tty.debug( spec_record.rebuild = False
"Pruning {0}/{1}, untouched by change.".format( spec_record.reason = "Pruned, untouched by change."
release_spec.name, release_spec.dag_hash()[:7]
)
)
spec_record["needs_rebuild"] = False
continue continue
up_to_date_mirrors = bindist.get_mirrors_for_spec(
spec=release_spec, mirrors_to_check=mirrors_to_check, index_only=check_index_only
)
spec_record.rebuild = not up_to_date_mirrors
if up_to_date_mirrors:
spec_record.reason = "Pruned, found in mirrors"
spec_record.mirrors = [m["mirror_url"] for m in up_to_date_mirrors]
else:
spec_record.reason = "Scheduled, not found anywhere"
job_object = spack_ci_ir["jobs"][release_spec_dag_hash]["attributes"] job_object = spack_ci_ir["jobs"][release_spec_dag_hash]["attributes"]
if not job_object: if not job_object:
@ -1054,7 +1041,7 @@ def main_script_replacements(cmd):
# purposes, so we only get the direct dependencies. # purposes, so we only get the direct dependencies.
dep_jobs = [] dep_jobs = []
for dep_label in dependencies[spec_label]: for dep_label in dependencies[spec_label]:
dep_jobs.append(spec_labels[dep_label]["spec"]) dep_jobs.append(spec_labels[dep_label])
job_object["needs"].extend( job_object["needs"].extend(
_format_job_needs( _format_job_needs(
@ -1062,20 +1049,24 @@ def main_script_replacements(cmd):
osname, osname,
build_group, build_group,
prune_dag, prune_dag,
spec_labels, rebuild_decisions,
enable_artifacts_buildcache, enable_artifacts_buildcache,
) )
) )
rebuild_spec = spec_record["needs_rebuild"] rebuild_spec = spec_record.rebuild
if prune_dag and not rebuild_spec and not copy_only_pipeline: if not rebuild_spec and not copy_only_pipeline:
tty.debug( if prune_dag:
"Pruning {0}/{1}, does not need rebuild.".format( spec_record.reason = "Pruned, up-to-date"
release_spec.name, release_spec.dag_hash() continue
) else:
) # DAG pruning is disabled, force the spec to rebuild. The
continue # record still contains any mirrors on which the spec
# may have been found, so we can print them in the staging
# summary.
spec_record.rebuild = True
spec_record.reason = "Scheduled, DAG pruning disabled"
if broken_spec_urls is not None and release_spec_dag_hash in broken_spec_urls: if broken_spec_urls is not None and release_spec_dag_hash in broken_spec_urls:
known_broken_specs_encountered.append(release_spec_dag_hash) known_broken_specs_encountered.append(release_spec_dag_hash)
@ -1115,6 +1106,8 @@ def main_script_replacements(cmd):
{"job": generate_job_name, "pipeline": "{0}".format(parent_pipeline_id)} {"job": generate_job_name, "pipeline": "{0}".format(parent_pipeline_id)}
) )
# Let downstream jobs know whether the spec needed rebuilding, regardless
# whether DAG pruning was enabled or not.
job_vars["SPACK_SPEC_NEEDS_REBUILD"] = str(rebuild_spec) job_vars["SPACK_SPEC_NEEDS_REBUILD"] = str(rebuild_spec)
if cdash_handler: if cdash_handler:
@ -1165,7 +1158,13 @@ def main_script_replacements(cmd):
job_id += 1 job_id += 1
if print_summary: if print_summary:
_print_staging_summary(spec_labels, dependencies, stages) _print_staging_summary(spec_labels, stages, mirrors_to_check, rebuild_decisions)
# Clean up remote mirror override if enabled
if remote_mirror_override:
spack.mirror.remove("ci_pr_mirror", cfg.default_modify_scope())
if spack_pipeline_type == "spack_pull_request":
spack.mirror.remove("ci_shared_pr_mirror", cfg.default_modify_scope())
tty.debug("{0} build jobs generated in {1} stages".format(job_id, stage_id)) tty.debug("{0} build jobs generated in {1} stages".format(job_id, stage_id))