Optimize marshaling of Repo and RepoPath (#45742)
When sending Repo and RepoPath over to a child process, we go through a marshaling procedure with pickle. The default behavior for these classes is highly inefficient, as it serializes a lot of specs that can just be reconstructed on the other end of the pipe. Here we write optimized procedures to __reduce__ both classes.
This commit is contained in:
		 Massimiliano Culpo
					Massimiliano Culpo
				
			
				
					committed by
					
						 GitHub
						GitHub
					
				
			
			
				
	
			
			
			 GitHub
						GitHub
					
				
			
						parent
						
							03a7da1e44
						
					
				
				
					commit
					94961ffe0a
				
			| @@ -12,7 +12,7 @@ | |||||||
| import re | import re | ||||||
| import sys | import sys | ||||||
| import warnings | import warnings | ||||||
| from typing import Dict, List, Optional, Set, Tuple, Type | from typing import Dict, Iterable, List, Optional, Set, Tuple, Type | ||||||
| 
 | 
 | ||||||
| import llnl.util.filesystem | import llnl.util.filesystem | ||||||
| import llnl.util.lang | import llnl.util.lang | ||||||
| @@ -187,7 +187,7 @@ def libraries_in_windows_paths(path_hints: Optional[List[str]] = None) -> Dict[s | |||||||
|     return path_to_dict(search_paths) |     return path_to_dict(search_paths) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def _group_by_prefix(paths: Set[str]) -> Dict[str, Set[str]]: | def _group_by_prefix(paths: List[str]) -> Dict[str, Set[str]]: | ||||||
|     groups = collections.defaultdict(set) |     groups = collections.defaultdict(set) | ||||||
|     for p in paths: |     for p in paths: | ||||||
|         groups[os.path.dirname(p)].add(p) |         groups[os.path.dirname(p)].add(p) | ||||||
| @@ -243,7 +243,9 @@ def detect_specs( | |||||||
|             return [] |             return [] | ||||||
| 
 | 
 | ||||||
|         result = [] |         result = [] | ||||||
|         for candidate_path, items_in_prefix in sorted(_group_by_prefix(set(paths)).items()): |         for candidate_path, items_in_prefix in _group_by_prefix( | ||||||
|  |             llnl.util.lang.dedupe(paths) | ||||||
|  |         ).items(): | ||||||
|             # TODO: multiple instances of a package can live in the same |             # TODO: multiple instances of a package can live in the same | ||||||
|             # prefix, and a package implementation can return multiple specs |             # prefix, and a package implementation can return multiple specs | ||||||
|             # for one prefix, but without additional details (e.g. about the |             # for one prefix, but without additional details (e.g. about the | ||||||
| @@ -299,19 +301,17 @@ def detect_specs( | |||||||
|         return result |         return result | ||||||
| 
 | 
 | ||||||
|     def find( |     def find( | ||||||
|         self, *, pkg_name: str, initial_guess: Optional[List[str]] = None |         self, *, pkg_name: str, repository, initial_guess: Optional[List[str]] = None | ||||||
|     ) -> List[DetectedPackage]: |     ) -> List[DetectedPackage]: | ||||||
|         """For a given package, returns a list of detected specs. |         """For a given package, returns a list of detected specs. | ||||||
| 
 | 
 | ||||||
|         Args: |         Args: | ||||||
|             pkg_name: package being detected |             pkg_name: package being detected | ||||||
|             initial_guess: initial list of paths to search from the caller |             repository: repository to retrieve the package | ||||||
|                            if None, default paths are searched. If this |             initial_guess: initial list of paths to search from the caller if None, default paths | ||||||
|                            is an empty list, nothing will be searched. |                 are searched. If this is an empty list, nothing will be searched. | ||||||
|         """ |         """ | ||||||
|         import spack.repo |         pkg_cls = repository.get_pkg_class(pkg_name) | ||||||
| 
 |  | ||||||
|         pkg_cls = spack.repo.PATH.get_pkg_class(pkg_name) |  | ||||||
|         patterns = self.search_patterns(pkg=pkg_cls) |         patterns = self.search_patterns(pkg=pkg_cls) | ||||||
|         if not patterns: |         if not patterns: | ||||||
|             return [] |             return [] | ||||||
| @@ -382,7 +382,7 @@ def prefix_from_path(self, *, path: str) -> str: | |||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def by_path( | def by_path( | ||||||
|     packages_to_search: List[str], |     packages_to_search: Iterable[str], | ||||||
|     *, |     *, | ||||||
|     path_hints: Optional[List[str]] = None, |     path_hints: Optional[List[str]] = None, | ||||||
|     max_workers: Optional[int] = None, |     max_workers: Optional[int] = None, | ||||||
| @@ -396,19 +396,28 @@ def by_path( | |||||||
|         path_hints: initial list of paths to be searched |         path_hints: initial list of paths to be searched | ||||||
|         max_workers: maximum number of workers to search for packages in parallel |         max_workers: maximum number of workers to search for packages in parallel | ||||||
|     """ |     """ | ||||||
|  |     import spack.repo | ||||||
|  | 
 | ||||||
|     # TODO: Packages should be able to define both .libraries and .executables in the future |     # TODO: Packages should be able to define both .libraries and .executables in the future | ||||||
|     # TODO: determine_spec_details should get all relevant libraries and executables in one call |     # TODO: determine_spec_details should get all relevant libraries and executables in one call | ||||||
|     executables_finder, libraries_finder = ExecutablesFinder(), LibrariesFinder() |     executables_finder, libraries_finder = ExecutablesFinder(), LibrariesFinder() | ||||||
|     detected_specs_by_package: Dict[str, Tuple[concurrent.futures.Future, ...]] = {} |     detected_specs_by_package: Dict[str, Tuple[concurrent.futures.Future, ...]] = {} | ||||||
| 
 | 
 | ||||||
|     result = collections.defaultdict(list) |     result = collections.defaultdict(list) | ||||||
|  |     repository = spack.repo.PATH.ensure_unwrapped() | ||||||
|     with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: |     with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: | ||||||
|         for pkg in packages_to_search: |         for pkg in packages_to_search: | ||||||
|             executable_future = executor.submit( |             executable_future = executor.submit( | ||||||
|                 executables_finder.find, pkg_name=pkg, initial_guess=path_hints |                 executables_finder.find, | ||||||
|  |                 pkg_name=pkg, | ||||||
|  |                 initial_guess=path_hints, | ||||||
|  |                 repository=repository, | ||||||
|             ) |             ) | ||||||
|             library_future = executor.submit( |             library_future = executor.submit( | ||||||
|                 libraries_finder.find, pkg_name=pkg, initial_guess=path_hints |                 libraries_finder.find, | ||||||
|  |                 pkg_name=pkg, | ||||||
|  |                 initial_guess=path_hints, | ||||||
|  |                 repository=repository, | ||||||
|             ) |             ) | ||||||
|             detected_specs_by_package[pkg] = executable_future, library_future |             detected_specs_by_package[pkg] = executable_future, library_future | ||||||
| 
 | 
 | ||||||
|   | |||||||
| @@ -149,12 +149,12 @@ def current_repository(self, value): | |||||||
|     @contextlib.contextmanager |     @contextlib.contextmanager | ||||||
|     def switch_repo(self, substitute: "RepoType"): |     def switch_repo(self, substitute: "RepoType"): | ||||||
|         """Switch the current repository list for the duration of the context manager.""" |         """Switch the current repository list for the duration of the context manager.""" | ||||||
|         old = self.current_repository |         old = self._repo | ||||||
|         try: |         try: | ||||||
|             self.current_repository = substitute |             self._repo = substitute | ||||||
|             yield |             yield | ||||||
|         finally: |         finally: | ||||||
|             self.current_repository = old |             self._repo = old | ||||||
| 
 | 
 | ||||||
|     def find_spec(self, fullname, python_path, target=None): |     def find_spec(self, fullname, python_path, target=None): | ||||||
|         # "target" is not None only when calling importlib.reload() |         # "target" is not None only when calling importlib.reload() | ||||||
| @@ -683,7 +683,7 @@ class RepoPath: | |||||||
|     def __init__( |     def __init__( | ||||||
|         self, |         self, | ||||||
|         *repos: Union[str, "Repo"], |         *repos: Union[str, "Repo"], | ||||||
|         cache: "spack.caches.FileCacheType", |         cache: Optional["spack.caches.FileCacheType"], | ||||||
|         overrides: Optional[Dict[str, Any]] = None, |         overrides: Optional[Dict[str, Any]] = None, | ||||||
|     ) -> None: |     ) -> None: | ||||||
|         self.repos: List[Repo] = [] |         self.repos: List[Repo] = [] | ||||||
| @@ -696,6 +696,7 @@ def __init__( | |||||||
|         for repo in repos: |         for repo in repos: | ||||||
|             try: |             try: | ||||||
|                 if isinstance(repo, str): |                 if isinstance(repo, str): | ||||||
|  |                     assert cache is not None, "cache must hold a value, when repo is a string" | ||||||
|                     repo = Repo(repo, cache=cache, overrides=overrides) |                     repo = Repo(repo, cache=cache, overrides=overrides) | ||||||
|                 repo.finder(self) |                 repo.finder(self) | ||||||
|                 self.put_last(repo) |                 self.put_last(repo) | ||||||
| @@ -707,6 +708,10 @@ def __init__( | |||||||
|                     f"    spack repo rm {repo}", |                     f"    spack repo rm {repo}", | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|  |     def ensure_unwrapped(self) -> "RepoPath": | ||||||
|  |         """Ensure we unwrap this object from any dynamic wrapper (like Singleton)""" | ||||||
|  |         return self | ||||||
|  | 
 | ||||||
|     def put_first(self, repo: "Repo") -> None: |     def put_first(self, repo: "Repo") -> None: | ||||||
|         """Add repo first in the search path.""" |         """Add repo first in the search path.""" | ||||||
|         if isinstance(repo, RepoPath): |         if isinstance(repo, RepoPath): | ||||||
| @@ -930,6 +935,16 @@ def is_virtual_safe(self, pkg_name: str) -> bool: | |||||||
|     def __contains__(self, pkg_name): |     def __contains__(self, pkg_name): | ||||||
|         return self.exists(pkg_name) |         return self.exists(pkg_name) | ||||||
| 
 | 
 | ||||||
|  |     def marshal(self): | ||||||
|  |         return (self.repos,) | ||||||
|  | 
 | ||||||
|  |     @staticmethod | ||||||
|  |     def unmarshal(repos): | ||||||
|  |         return RepoPath(*repos, cache=None) | ||||||
|  | 
 | ||||||
|  |     def __reduce__(self): | ||||||
|  |         return RepoPath.unmarshal, self.marshal() | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| class Repo: | class Repo: | ||||||
|     """Class representing a package repository in the filesystem. |     """Class representing a package repository in the filesystem. | ||||||
| @@ -1319,6 +1334,20 @@ def __repr__(self) -> str: | |||||||
|     def __contains__(self, pkg_name: str) -> bool: |     def __contains__(self, pkg_name: str) -> bool: | ||||||
|         return self.exists(pkg_name) |         return self.exists(pkg_name) | ||||||
| 
 | 
 | ||||||
|  |     @staticmethod | ||||||
|  |     def unmarshal(root, cache, overrides): | ||||||
|  |         """Helper method to unmarshal keyword arguments""" | ||||||
|  |         return Repo(root, cache=cache, overrides=overrides) | ||||||
|  | 
 | ||||||
|  |     def marshal(self): | ||||||
|  |         cache = self._cache | ||||||
|  |         if isinstance(cache, llnl.util.lang.Singleton): | ||||||
|  |             cache = cache.instance | ||||||
|  |         return self.root, cache, self.overrides | ||||||
|  | 
 | ||||||
|  |     def __reduce__(self): | ||||||
|  |         return Repo.unmarshal, self.marshal() | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| RepoType = Union[Repo, RepoPath] | RepoType = Union[Repo, RepoPath] | ||||||
| 
 | 
 | ||||||
|   | |||||||
| @@ -14,6 +14,7 @@ | |||||||
| import spack.cmd.external | import spack.cmd.external | ||||||
| import spack.detection | import spack.detection | ||||||
| import spack.detection.path | import spack.detection.path | ||||||
|  | import spack.repo | ||||||
| from spack.main import SpackCommand | from spack.main import SpackCommand | ||||||
| from spack.spec import Spec | from spack.spec import Spec | ||||||
| 
 | 
 | ||||||
| @@ -55,7 +56,9 @@ def test_find_external_two_instances_same_package(mock_executable): | |||||||
|     search_paths = [str(cmake1.parent.parent), str(cmake2.parent.parent)] |     search_paths = [str(cmake1.parent.parent), str(cmake2.parent.parent)] | ||||||
| 
 | 
 | ||||||
|     finder = spack.detection.path.ExecutablesFinder() |     finder = spack.detection.path.ExecutablesFinder() | ||||||
|     detected_specs = finder.find(pkg_name="cmake", initial_guess=search_paths) |     detected_specs = finder.find( | ||||||
|  |         pkg_name="cmake", initial_guess=search_paths, repository=spack.repo.PATH | ||||||
|  |     ) | ||||||
| 
 | 
 | ||||||
|     assert len(detected_specs) == 2 |     assert len(detected_specs) == 2 | ||||||
|     spec_to_path = {e.spec: e.prefix for e in detected_specs} |     spec_to_path = {e.spec: e.prefix for e in detected_specs} | ||||||
| @@ -263,7 +266,9 @@ def _determine_variants(cls, exes, version_str): | |||||||
|     monkeypatch.setattr(gcc_cls, "determine_variants", _determine_variants) |     monkeypatch.setattr(gcc_cls, "determine_variants", _determine_variants) | ||||||
| 
 | 
 | ||||||
|     finder = spack.detection.path.ExecutablesFinder() |     finder = spack.detection.path.ExecutablesFinder() | ||||||
|     detected_specs = finder.find(pkg_name="gcc", initial_guess=[str(search_dir)]) |     detected_specs = finder.find( | ||||||
|  |         pkg_name="gcc", initial_guess=[str(search_dir)], repository=spack.repo.PATH | ||||||
|  |     ) | ||||||
| 
 | 
 | ||||||
|     assert len(detected_specs) == 1 |     assert len(detected_specs) == 1 | ||||||
| 
 | 
 | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user