oci: only push in parallel when forking (#42143)
This commit is contained in:
		| @@ -7,13 +7,14 @@ | |||||||
| import glob | import glob | ||||||
| import hashlib | import hashlib | ||||||
| import json | import json | ||||||
|  | import multiprocessing | ||||||
| import multiprocessing.pool | import multiprocessing.pool | ||||||
| import os | import os | ||||||
| import shutil | import shutil | ||||||
| import sys | import sys | ||||||
| import tempfile | import tempfile | ||||||
| import urllib.request | import urllib.request | ||||||
| from typing import Dict, List, Optional, Tuple | from typing import Dict, List, Optional, Tuple, Union | ||||||
| 
 | 
 | ||||||
| import llnl.util.tty as tty | import llnl.util.tty as tty | ||||||
| from llnl.string import plural | from llnl.string import plural | ||||||
| @@ -326,8 +327,30 @@ def _progress(i: int, total: int): | |||||||
|     return "" |     return "" | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def _make_pool(): | class NoPool: | ||||||
|  |     def map(self, func, args): | ||||||
|  |         return [func(a) for a in args] | ||||||
|  | 
 | ||||||
|  |     def starmap(self, func, args): | ||||||
|  |         return [func(*a) for a in args] | ||||||
|  | 
 | ||||||
|  |     def __enter__(self): | ||||||
|  |         return self | ||||||
|  | 
 | ||||||
|  |     def __exit__(self, *args): | ||||||
|  |         pass | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | MaybePool = Union[multiprocessing.pool.Pool, NoPool] | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def _make_pool() -> MaybePool: | ||||||
|  |     """Can't use threading because it's unsafe, and can't use spawned processes because of globals. | ||||||
|  |     That leaves only forking""" | ||||||
|  |     if multiprocessing.get_start_method() == "fork": | ||||||
|         return multiprocessing.pool.Pool(determine_number_of_jobs(parallel=True)) |         return multiprocessing.pool.Pool(determine_number_of_jobs(parallel=True)) | ||||||
|  |     else: | ||||||
|  |         return NoPool() | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def push_fn(args): | def push_fn(args): | ||||||
| @@ -663,7 +686,7 @@ def _push_oci( | |||||||
|     base_image: Optional[ImageReference], |     base_image: Optional[ImageReference], | ||||||
|     installed_specs_with_deps: List[Spec], |     installed_specs_with_deps: List[Spec], | ||||||
|     tmpdir: str, |     tmpdir: str, | ||||||
|     pool: multiprocessing.pool.Pool, |     pool: MaybePool, | ||||||
|     force: bool = False, |     force: bool = False, | ||||||
| ) -> Tuple[List[str], Dict[str, Tuple[dict, dict]], Dict[str, spack.oci.oci.Blob]]: | ) -> Tuple[List[str], Dict[str, Tuple[dict, dict]], Dict[str, spack.oci.oci.Blob]]: | ||||||
|     """Push specs to an OCI registry |     """Push specs to an OCI registry | ||||||
| @@ -779,9 +802,7 @@ def _config_from_tag(image_ref: ImageReference, tag: str) -> Optional[dict]: | |||||||
|     return config if "spec" in config else None |     return config if "spec" in config else None | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def _update_index_oci( | def _update_index_oci(image_ref: ImageReference, tmpdir: str, pool: MaybePool) -> None: | ||||||
|     image_ref: ImageReference, tmpdir: str, pool: multiprocessing.pool.Pool |  | ||||||
| ) -> None: |  | ||||||
|     response = spack.oci.opener.urlopen(urllib.request.Request(url=image_ref.tags_url())) |     response = spack.oci.opener.urlopen(urllib.request.Request(url=image_ref.tags_url())) | ||||||
|     spack.oci.opener.ensure_status(response, 200) |     spack.oci.opener.ensure_status(response, 200) | ||||||
|     tags = json.load(response)["tags"] |     tags = json.load(response)["tags"] | ||||||
|   | |||||||
| @@ -1950,23 +1950,10 @@ def pytest_runtest_setup(item): | |||||||
|         pytest.skip(*not_on_windows_marker.args) |         pytest.skip(*not_on_windows_marker.args) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class MockPool: |  | ||||||
|     def map(self, func, args): |  | ||||||
|         return [func(a) for a in args] |  | ||||||
| 
 |  | ||||||
|     def starmap(self, func, args): |  | ||||||
|         return [func(*a) for a in args] |  | ||||||
| 
 |  | ||||||
|     def __enter__(self): |  | ||||||
|         return self |  | ||||||
| 
 |  | ||||||
|     def __exit__(self, *args): |  | ||||||
|         pass |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| @pytest.fixture(scope="function") | @pytest.fixture(scope="function") | ||||||
| def disable_parallel_buildcache_push(monkeypatch): | def disable_parallel_buildcache_push(monkeypatch): | ||||||
|     monkeypatch.setattr(spack.cmd.buildcache, "_make_pool", MockPool) |     """Disable process pools in tests.""" | ||||||
|  |     monkeypatch.setattr(spack.cmd.buildcache, "_make_pool", spack.cmd.buildcache.NoPool) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def _root_path(x, y, *, path): | def _root_path(x, y, *, path): | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Harmen Stoppels
					Harmen Stoppels