avoid multiprocessing in tests (#46439)

- silences a few pytest warnings related to forking in xdist
- speeds up a few tests / avoids possible oversubscription in xdist
This commit is contained in:
Harmen Stoppels 2024-09-19 10:34:23 +02:00 committed by GitHub
parent 28e3295fb0
commit fe5d7881f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 23 additions and 15 deletions

View File

@ -23,6 +23,7 @@
import spack.util.environment import spack.util.environment
import spack.util.environment as environment import spack.util.environment as environment
import spack.util.ld_so_conf import spack.util.ld_so_conf
import spack.util.parallel
from .common import ( from .common import (
WindowsCompilerExternalPaths, WindowsCompilerExternalPaths,
@ -407,7 +408,7 @@ def by_path(
result = collections.defaultdict(list) result = collections.defaultdict(list)
repository = spack.repo.PATH.ensure_unwrapped() repository = spack.repo.PATH.ensure_unwrapped()
with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: with spack.util.parallel.make_concurrent_executor(max_workers, require_fork=False) as executor:
for pkg in packages_to_search: for pkg in packages_to_search:
executable_future = executor.submit( executable_future = executor.submit(
executables_finder.find, executables_finder.find,

View File

@ -2,7 +2,6 @@
# Spack Project Developers. See the top-level COPYRIGHT file for details. # Spack Project Developers. See the top-level COPYRIGHT file for details.
# #
# SPDX-License-Identifier: (Apache-2.0 OR MIT) # SPDX-License-Identifier: (Apache-2.0 OR MIT)
import concurrent.futures
import errno import errno
import getpass import getpass
import glob import glob
@ -40,6 +39,7 @@
import spack.spec import spack.spec
import spack.util.crypto import spack.util.crypto
import spack.util.lock import spack.util.lock
import spack.util.parallel
import spack.util.path as sup import spack.util.path as sup
import spack.util.pattern as pattern import spack.util.pattern as pattern
import spack.util.url as url_util import spack.util.url as url_util
@ -1132,7 +1132,7 @@ def get_checksums_for_versions(
if checksum is not None: if checksum is not None:
version_hashes[version] = checksum version_hashes[version] = checksum
with concurrent.futures.ProcessPoolExecutor(max_workers=concurrency) as executor: with spack.util.parallel.make_concurrent_executor(concurrency, require_fork=False) as executor:
results = [] results = []
for url, version in search_arguments: for url, version in search_arguments:
future = executor.submit(_fetch_and_checksum, url, fetch_options, keep_stage) future = executor.submit(_fetch_and_checksum, url, fetch_options, keep_stage)

View File

@ -1980,12 +1980,14 @@ def pytest_runtest_setup(item):
pytest.skip(*not_on_windows_marker.args) pytest.skip(*not_on_windows_marker.args)
def _sequential_executor(*args, **kwargs):
return spack.util.parallel.SequentialExecutor()
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def disable_parallel_buildcache_push(monkeypatch): def disable_parallel_buildcache_push(monkeypatch):
"""Disable process pools in tests.""" """Disable process pools in tests."""
monkeypatch.setattr( monkeypatch.setattr(spack.util.parallel, "make_concurrent_executor", _sequential_executor)
spack.util.parallel, "make_concurrent_executor", spack.util.parallel.SequentialExecutor
)
def _root_path(x, y, *, path): def _root_path(x, y, *, path):

View File

@ -9,7 +9,7 @@
import traceback import traceback
from typing import Optional from typing import Optional
from spack.config import determine_number_of_jobs import spack.config
class ErrorFromWorker: class ErrorFromWorker:
@ -98,9 +98,14 @@ def submit(self, fn, *args, **kwargs):
return future return future
def make_concurrent_executor() -> concurrent.futures.Executor: def make_concurrent_executor(
"""Can't use threading because it's unsafe, and can't use spawned processes because of globals. jobs: Optional[int] = None, *, require_fork: bool = True
That leaves only forking.""" ) -> concurrent.futures.Executor:
if multiprocessing.get_start_method() == "fork": """Create a concurrent executor. If require_fork is True, then the executor is sequential
return concurrent.futures.ProcessPoolExecutor(determine_number_of_jobs(parallel=True)) if the platform does not enable forking as the default start method. Effectively
require_fork=True makes the executor sequential in the current process on Windows, macOS, and
Linux from Python 3.14+ (which changes defaults)"""
if require_fork and multiprocessing.get_start_method() != "fork":
return SequentialExecutor() return SequentialExecutor()
jobs = jobs or spack.config.determine_number_of_jobs(parallel=True)
return concurrent.futures.ProcessPoolExecutor(jobs)

View File

@ -4,7 +4,6 @@
# SPDX-License-Identifier: (Apache-2.0 OR MIT) # SPDX-License-Identifier: (Apache-2.0 OR MIT)
import codecs import codecs
import concurrent.futures
import email.message import email.message
import errno import errno
import json import json
@ -30,6 +29,7 @@
import spack.config import spack.config
import spack.error import spack.error
import spack.util.executable import spack.util.executable
import spack.util.parallel
import spack.util.path import spack.util.path
import spack.util.url as url_util import spack.util.url as url_util
@ -641,7 +641,7 @@ def spider(
root = urllib.parse.urlparse(root_str) root = urllib.parse.urlparse(root_str)
spider_args.append((root, go_deeper, _visited)) spider_args.append((root, go_deeper, _visited))
with concurrent.futures.ProcessPoolExecutor(max_workers=concurrency) as tp: with spack.util.parallel.make_concurrent_executor(concurrency, require_fork=False) as tp:
while current_depth <= depth: while current_depth <= depth:
tty.debug( tty.debug(
f"SPIDER: [depth={current_depth}, max_depth={depth}, urls={len(spider_args)}]" f"SPIDER: [depth={current_depth}, max_depth={depth}, urls={len(spider_args)}]"