From fe5d7881f5a5ec72e2ffbeccc647968dee44214e Mon Sep 17 00:00:00 2001 From: Harmen Stoppels Date: Thu, 19 Sep 2024 10:34:23 +0200 Subject: [PATCH] avoid multiprocessing in tests (#46439) - silences a few pytest warnings related to forking in xdist - speeds up a few tests / avoids possible oversubscription in xdist --- lib/spack/spack/detection/path.py | 3 ++- lib/spack/spack/stage.py | 4 ++-- lib/spack/spack/test/conftest.py | 8 +++++--- lib/spack/spack/util/parallel.py | 19 ++++++++++++------- lib/spack/spack/util/web.py | 4 ++-- 5 files changed, 23 insertions(+), 15 deletions(-) diff --git a/lib/spack/spack/detection/path.py b/lib/spack/spack/detection/path.py index 3588d96115c..7316d9124a7 100644 --- a/lib/spack/spack/detection/path.py +++ b/lib/spack/spack/detection/path.py @@ -23,6 +23,7 @@ import spack.util.environment import spack.util.environment as environment import spack.util.ld_so_conf +import spack.util.parallel from .common import ( WindowsCompilerExternalPaths, @@ -407,7 +408,7 @@ def by_path( result = collections.defaultdict(list) repository = spack.repo.PATH.ensure_unwrapped() - with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: + with spack.util.parallel.make_concurrent_executor(max_workers, require_fork=False) as executor: for pkg in packages_to_search: executable_future = executor.submit( executables_finder.find, diff --git a/lib/spack/spack/stage.py b/lib/spack/spack/stage.py index 776b3ce6b94..ee3b4e95e54 100644 --- a/lib/spack/spack/stage.py +++ b/lib/spack/spack/stage.py @@ -2,7 +2,6 @@ # Spack Project Developers. See the top-level COPYRIGHT file for details. # # SPDX-License-Identifier: (Apache-2.0 OR MIT) -import concurrent.futures import errno import getpass import glob @@ -40,6 +39,7 @@ import spack.spec import spack.util.crypto import spack.util.lock +import spack.util.parallel import spack.util.path as sup import spack.util.pattern as pattern import spack.util.url as url_util @@ -1132,7 +1132,7 @@ def get_checksums_for_versions( if checksum is not None: version_hashes[version] = checksum - with concurrent.futures.ProcessPoolExecutor(max_workers=concurrency) as executor: + with spack.util.parallel.make_concurrent_executor(concurrency, require_fork=False) as executor: results = [] for url, version in search_arguments: future = executor.submit(_fetch_and_checksum, url, fetch_options, keep_stage) diff --git a/lib/spack/spack/test/conftest.py b/lib/spack/spack/test/conftest.py index 607844265ff..2726061cb3f 100644 --- a/lib/spack/spack/test/conftest.py +++ b/lib/spack/spack/test/conftest.py @@ -1980,12 +1980,14 @@ def pytest_runtest_setup(item): pytest.skip(*not_on_windows_marker.args) +def _sequential_executor(*args, **kwargs): + return spack.util.parallel.SequentialExecutor() + + @pytest.fixture(autouse=True) def disable_parallel_buildcache_push(monkeypatch): """Disable process pools in tests.""" - monkeypatch.setattr( - spack.util.parallel, "make_concurrent_executor", spack.util.parallel.SequentialExecutor - ) + monkeypatch.setattr(spack.util.parallel, "make_concurrent_executor", _sequential_executor) def _root_path(x, y, *, path): diff --git a/lib/spack/spack/util/parallel.py b/lib/spack/spack/util/parallel.py index 9bbdf5dd7a3..b16adb88582 100644 --- a/lib/spack/spack/util/parallel.py +++ b/lib/spack/spack/util/parallel.py @@ -9,7 +9,7 @@ import traceback from typing import Optional -from spack.config import determine_number_of_jobs +import spack.config class ErrorFromWorker: @@ -98,9 +98,14 @@ def submit(self, fn, *args, **kwargs): return future -def make_concurrent_executor() -> concurrent.futures.Executor: - """Can't use threading because it's unsafe, and can't use spawned processes because of globals. - That leaves only forking.""" - if multiprocessing.get_start_method() == "fork": - return concurrent.futures.ProcessPoolExecutor(determine_number_of_jobs(parallel=True)) - return SequentialExecutor() +def make_concurrent_executor( + jobs: Optional[int] = None, *, require_fork: bool = True +) -> concurrent.futures.Executor: + """Create a concurrent executor. If require_fork is True, then the executor is sequential + if the platform does not enable forking as the default start method. Effectively + require_fork=True makes the executor sequential in the current process on Windows, macOS, and + Linux from Python 3.14+ (which changes defaults)""" + if require_fork and multiprocessing.get_start_method() != "fork": + return SequentialExecutor() + jobs = jobs or spack.config.determine_number_of_jobs(parallel=True) + return concurrent.futures.ProcessPoolExecutor(jobs) diff --git a/lib/spack/spack/util/web.py b/lib/spack/spack/util/web.py index 9a0f1d6e4b6..892b64d333d 100644 --- a/lib/spack/spack/util/web.py +++ b/lib/spack/spack/util/web.py @@ -4,7 +4,6 @@ # SPDX-License-Identifier: (Apache-2.0 OR MIT) import codecs -import concurrent.futures import email.message import errno import json @@ -30,6 +29,7 @@ import spack.config import spack.error import spack.util.executable +import spack.util.parallel import spack.util.path import spack.util.url as url_util @@ -641,7 +641,7 @@ def spider( root = urllib.parse.urlparse(root_str) spider_args.append((root, go_deeper, _visited)) - with concurrent.futures.ProcessPoolExecutor(max_workers=concurrency) as tp: + with spack.util.parallel.make_concurrent_executor(concurrency, require_fork=False) as tp: while current_depth <= depth: tty.debug( f"SPIDER: [depth={current_depth}, max_depth={depth}, urls={len(spider_args)}]"