llnl.util.filesystem.find: multiple entrypoints (#47436)
				
					
				
			You can now provide multiple roots to a single `find()` call and all of them will be searched. The roots can overlap (e.g. can be parents of one another). This also adds a library function for taking a set of regular expression patterns and creating a single OR expression (and that library function is used in `find` to improve its performance).
This commit is contained in:
		@@ -20,11 +20,11 @@
 | 
			
		||||
import tempfile
 | 
			
		||||
from contextlib import contextmanager
 | 
			
		||||
from itertools import accumulate
 | 
			
		||||
from typing import Callable, Iterable, List, Match, Optional, Tuple, Union
 | 
			
		||||
from typing import Callable, Deque, Dict, Iterable, List, Match, Optional, Set, Tuple, Union
 | 
			
		||||
 | 
			
		||||
import llnl.util.symlink
 | 
			
		||||
from llnl.util import tty
 | 
			
		||||
from llnl.util.lang import dedupe, memoized
 | 
			
		||||
from llnl.util.lang import dedupe, fnmatch_translate_multiple, memoized
 | 
			
		||||
from llnl.util.symlink import islink, readlink, resolve_link_target_relative_to_the_link, symlink
 | 
			
		||||
 | 
			
		||||
from ..path import path_to_os_path, system_path_filter
 | 
			
		||||
@@ -1673,32 +1673,40 @@ def find_first(root: str, files: Union[Iterable[str], str], bfs_depth: int = 2)
 | 
			
		||||
    return FindFirstFile(root, *files, bfs_depth=bfs_depth).find()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def find(root, files, recursive=True, max_depth: Optional[int] = None):
 | 
			
		||||
    """Search for ``files`` starting from the ``root`` directory.
 | 
			
		||||
 | 
			
		||||
    Like GNU/BSD find but written entirely in Python.
 | 
			
		||||
 | 
			
		||||
    Specifically this behaves like `find -type f`: it only returns
 | 
			
		||||
    results that are files. When searching recursively, this behaves
 | 
			
		||||
    as `find` with the `-L` option (follows symlinks).
 | 
			
		||||
def find(
 | 
			
		||||
    root: Union[str, List[str]],
 | 
			
		||||
    files: Union[str, List[str]],
 | 
			
		||||
    recursive: bool = True,
 | 
			
		||||
    max_depth: Optional[int] = None,
 | 
			
		||||
) -> List[str]:
 | 
			
		||||
    """Finds all non-directory files matching the filename patterns from ``files`` starting from
 | 
			
		||||
    ``root``. This function returns a deterministic result for the same input and directory
 | 
			
		||||
    structure when run multiple times. Symlinked directories are followed, and unique directories
 | 
			
		||||
    are searched only once. Each matching file is returned only once at lowest depth in case
 | 
			
		||||
    multiple paths exist due to symlinked directories. The function has similarities to the Unix
 | 
			
		||||
    ``find`` utility.
 | 
			
		||||
 | 
			
		||||
    Examples:
 | 
			
		||||
 | 
			
		||||
    .. code-block:: console
 | 
			
		||||
 | 
			
		||||
       $ find -L /usr -name python
 | 
			
		||||
       $ find -L /usr -name python3 -type f
 | 
			
		||||
 | 
			
		||||
    is equivalent to:
 | 
			
		||||
    is roughly equivalent to
 | 
			
		||||
 | 
			
		||||
    >>> find('/usr', 'python')
 | 
			
		||||
    >>> find("/usr", "python3")
 | 
			
		||||
 | 
			
		||||
    with the notable difference that this function only lists a single path to each file in case of
 | 
			
		||||
    symlinked directories.
 | 
			
		||||
 | 
			
		||||
    .. code-block:: console
 | 
			
		||||
 | 
			
		||||
       $ find /usr/local/bin -maxdepth 1 -name python
 | 
			
		||||
       $ find -L /usr/local/bin /usr/local/sbin -maxdepth 1 '(' -name python3 -o -name getcap \\
 | 
			
		||||
            ')' -type f
 | 
			
		||||
 | 
			
		||||
    is equivalent to:
 | 
			
		||||
    is roughly equivalent to:
 | 
			
		||||
 | 
			
		||||
    >>> find('/usr/local/bin', 'python', recursive=False)
 | 
			
		||||
    >>> find(["/usr/local/bin", "/usr/local/sbin"], ["python3", "getcap"], recursive=False)
 | 
			
		||||
 | 
			
		||||
    Accepts any glob characters accepted by fnmatch:
 | 
			
		||||
 | 
			
		||||
@@ -1712,17 +1720,17 @@ def find(root, files, recursive=True, max_depth: Optional[int] = None):
 | 
			
		||||
    ==========  ====================================
 | 
			
		||||
 | 
			
		||||
    Parameters:
 | 
			
		||||
        root (str): The root directory to start searching from
 | 
			
		||||
        files (str or collections.abc.Sequence): Library name(s) to search for
 | 
			
		||||
        recursive (bool): if False search only root folder,
 | 
			
		||||
            if True descends top-down from the root. Defaults to True.
 | 
			
		||||
        max_depth (int): if set, don't search below this depth. Cannot be set
 | 
			
		||||
            if recursive is False
 | 
			
		||||
        root: One or more root directories to start searching from
 | 
			
		||||
        files: One or more filename patterns to search for
 | 
			
		||||
        recursive: if False search only root, if True descends from roots. Defaults to True.
 | 
			
		||||
        max_depth: if set, don't search below this depth. Cannot be set if recursive is False
 | 
			
		||||
 | 
			
		||||
    Returns:
 | 
			
		||||
        list: The files that have been found
 | 
			
		||||
    Returns a list of absolute, matching file paths.
 | 
			
		||||
    """
 | 
			
		||||
    if isinstance(files, str):
 | 
			
		||||
    if not isinstance(root, list):
 | 
			
		||||
        root = [root]
 | 
			
		||||
 | 
			
		||||
    if not isinstance(files, list):
 | 
			
		||||
        files = [files]
 | 
			
		||||
 | 
			
		||||
    # If recursive is false, max_depth can only be None or 0
 | 
			
		||||
@@ -1734,10 +1742,9 @@ def find(root, files, recursive=True, max_depth: Optional[int] = None):
 | 
			
		||||
    elif max_depth is None:
 | 
			
		||||
        max_depth = sys.maxsize
 | 
			
		||||
 | 
			
		||||
    tty.debug(f"Find (max depth = {max_depth}): {root} {str(files)}")
 | 
			
		||||
    result = find_max_depth(root, files, max_depth)
 | 
			
		||||
 | 
			
		||||
    tty.debug(f"Find complete: {root} {str(files)}")
 | 
			
		||||
    tty.debug(f"Find (max depth = {max_depth}): {root} {files}")
 | 
			
		||||
    result = _find_max_depth(root, files, max_depth)
 | 
			
		||||
    tty.debug(f"Find complete: {root} {files}")
 | 
			
		||||
    return result
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@@ -1746,56 +1753,36 @@ def _log_file_access_issue(e: OSError, path: str) -> None:
 | 
			
		||||
    tty.debug(f"find must skip {path}: {errno_name} {e}")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@system_path_filter(arg_slice=slice(1))
 | 
			
		||||
def find_max_depth(root, globs, max_depth: Optional[int] = None):
 | 
			
		||||
    """Given a set of non-recursive glob file patterns, finds all
 | 
			
		||||
    files matching those patterns up to a maximum specified depth.
 | 
			
		||||
def _dir_id(s: os.stat_result) -> Tuple[int, int]:
 | 
			
		||||
    # Note: on windows, st_ino is the file index and st_dev is the volume serial number. See
 | 
			
		||||
    # https://github.com/python/cpython/blob/3.9/Python/fileutils.c
 | 
			
		||||
    return (s.st_ino, s.st_dev)
 | 
			
		||||
 | 
			
		||||
    If a directory has a name which matches an input pattern, it will
 | 
			
		||||
    not be included in the results.
 | 
			
		||||
 | 
			
		||||
    If ``max_depth`` is specified, does not search below that depth.
 | 
			
		||||
def _find_max_depth(roots: List[str], globs: List[str], max_depth: int = sys.maxsize) -> List[str]:
 | 
			
		||||
    """See ``find`` for the public API."""
 | 
			
		||||
    # Apply normcase to file patterns and filenames to respect case insensitive filesystems
 | 
			
		||||
    regex, groups = fnmatch_translate_multiple([os.path.normcase(x) for x in globs])
 | 
			
		||||
    # Ordered dictionary that keeps track of the files found for each pattern
 | 
			
		||||
    capture_group_to_paths: Dict[str, List[str]] = {group: [] for group in groups}
 | 
			
		||||
    # Ensure returned paths are always absolute
 | 
			
		||||
    roots = [os.path.abspath(r) for r in roots]
 | 
			
		||||
    # Breadth-first search queue. Each element is a tuple of (depth, directory)
 | 
			
		||||
    dir_queue: Deque[Tuple[int, str]] = collections.deque()
 | 
			
		||||
    # Set of visited directories. Each element is a tuple of (inode, device)
 | 
			
		||||
    visited_dirs: Set[Tuple[int, int]] = set()
 | 
			
		||||
 | 
			
		||||
    If ``globs`` is a list, files matching earlier entries are placed
 | 
			
		||||
    in the return value before files matching later entries.
 | 
			
		||||
    """
 | 
			
		||||
    try:
 | 
			
		||||
        stat_root = os.stat(root)
 | 
			
		||||
    except OSError:
 | 
			
		||||
        return []
 | 
			
		||||
    for root in roots:
 | 
			
		||||
        try:
 | 
			
		||||
            stat_root = os.stat(root)
 | 
			
		||||
        except OSError as e:
 | 
			
		||||
            _log_file_access_issue(e, root)
 | 
			
		||||
            continue
 | 
			
		||||
        dir_id = _dir_id(stat_root)
 | 
			
		||||
        if dir_id not in visited_dirs:
 | 
			
		||||
            dir_queue.appendleft((0, root))
 | 
			
		||||
            visited_dirs.add(dir_id)
 | 
			
		||||
 | 
			
		||||
    if max_depth is None:
 | 
			
		||||
        max_depth = sys.maxsize
 | 
			
		||||
 | 
			
		||||
    if isinstance(globs, str):
 | 
			
		||||
        globs = [globs]
 | 
			
		||||
    # Apply normcase to regular expressions and to the filenames:
 | 
			
		||||
    # this respects case-sensitivity semantics of different OSes
 | 
			
		||||
    # (e.g. file search is typically case-insensitive on Windows)
 | 
			
		||||
    regexes = [re.compile(fnmatch.translate(os.path.normcase(x))) for x in globs]
 | 
			
		||||
 | 
			
		||||
    # Note later calls to os.scandir etc. return abspaths if the
 | 
			
		||||
    # input is absolute, see https://docs.python.org/3/library/os.html#os.DirEntry.path
 | 
			
		||||
    root = os.path.abspath(root)
 | 
			
		||||
 | 
			
		||||
    found_files = collections.defaultdict(list)
 | 
			
		||||
 | 
			
		||||
    def _dir_id(stat_info):
 | 
			
		||||
        # Note: on windows, st_ino is the file index and st_dev
 | 
			
		||||
        # is the volume serial number. See
 | 
			
		||||
        # https://github.com/python/cpython/blob/3.9/Python/fileutils.c
 | 
			
		||||
        return (stat_info.st_ino, stat_info.st_dev)
 | 
			
		||||
 | 
			
		||||
    visited_dirs = set([_dir_id(stat_root)])
 | 
			
		||||
 | 
			
		||||
    # Each queue item stores the depth and path
 | 
			
		||||
    # This achieves a consistent traversal order by iterating through
 | 
			
		||||
    # each directory in alphabetical order.
 | 
			
		||||
    # This also traverses in BFS order to ensure finding the shortest
 | 
			
		||||
    # path to any file (or one of the shortest paths, if there are
 | 
			
		||||
    # several - the one returned will be consistent given the prior
 | 
			
		||||
    # point).
 | 
			
		||||
    dir_queue = collections.deque([(0, root)])
 | 
			
		||||
    while dir_queue:
 | 
			
		||||
        depth, next_dir = dir_queue.pop()
 | 
			
		||||
        try:
 | 
			
		||||
@@ -1810,20 +1797,18 @@ def _dir_id(stat_info):
 | 
			
		||||
                try:
 | 
			
		||||
                    it_is_a_dir = dir_entry.is_dir(follow_symlinks=True)
 | 
			
		||||
                except OSError as e:
 | 
			
		||||
                    # Possible permission issue, or a symlink that cannot
 | 
			
		||||
                    # be resolved (ELOOP).
 | 
			
		||||
                    # Possible permission issue, or a symlink that cannot be resolved (ELOOP).
 | 
			
		||||
                    _log_file_access_issue(e, dir_entry.path)
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
                if it_is_a_dir and (depth < max_depth):
 | 
			
		||||
                if it_is_a_dir and depth < max_depth:
 | 
			
		||||
                    try:
 | 
			
		||||
                        # The stat should be performed in a try/except block.
 | 
			
		||||
                        # We repeat that here vs. moving to the above block
 | 
			
		||||
                        # because we only want to call `stat` if we haven't
 | 
			
		||||
                        # exceeded our max_depth
 | 
			
		||||
                        # The stat should be performed in a try/except block. We repeat that here
 | 
			
		||||
                        # vs. moving to the above block because we only want to call `stat` if we
 | 
			
		||||
                        # haven't exceeded our max_depth
 | 
			
		||||
                        if sys.platform == "win32":
 | 
			
		||||
                            # Note: st_ino/st_dev on DirEntry.stat are not set on
 | 
			
		||||
                            # Windows, so we have to call os.stat
 | 
			
		||||
                            # Note: st_ino/st_dev on DirEntry.stat are not set on Windows, so we
 | 
			
		||||
                            # have to call os.stat
 | 
			
		||||
                            stat_info = os.stat(dir_entry.path, follow_symlinks=True)
 | 
			
		||||
                        else:
 | 
			
		||||
                            stat_info = dir_entry.stat(follow_symlinks=True)
 | 
			
		||||
@@ -1836,15 +1821,15 @@ def _dir_id(stat_info):
 | 
			
		||||
                        dir_queue.appendleft((depth + 1, dir_entry.path))
 | 
			
		||||
                        visited_dirs.add(dir_id)
 | 
			
		||||
                else:
 | 
			
		||||
                    fname = os.path.basename(dir_entry.path)
 | 
			
		||||
                    for pattern in regexes:
 | 
			
		||||
                        if pattern.match(os.path.normcase(fname)):
 | 
			
		||||
                            found_files[pattern].append(os.path.join(next_dir, fname))
 | 
			
		||||
                    m = regex.match(os.path.normcase(os.path.basename(dir_entry.path)))
 | 
			
		||||
                    if not m:
 | 
			
		||||
                        continue
 | 
			
		||||
                    for group in capture_group_to_paths:
 | 
			
		||||
                        if m.group(group):
 | 
			
		||||
                            capture_group_to_paths[group].append(dir_entry.path)
 | 
			
		||||
                            break
 | 
			
		||||
 | 
			
		||||
        # TODO: for fully-recursive searches, we can print a warning after
 | 
			
		||||
        # after having searched everything up to some fixed depth
 | 
			
		||||
 | 
			
		||||
    return list(itertools.chain(*[found_files[x] for x in regexes]))
 | 
			
		||||
    return [path for paths in capture_group_to_paths.values() for path in paths]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Utilities for libraries and headers
 | 
			
		||||
 
 | 
			
		||||
@@ -5,12 +5,14 @@
 | 
			
		||||
 | 
			
		||||
import collections.abc
 | 
			
		||||
import contextlib
 | 
			
		||||
import fnmatch
 | 
			
		||||
import functools
 | 
			
		||||
import itertools
 | 
			
		||||
import os
 | 
			
		||||
import re
 | 
			
		||||
import sys
 | 
			
		||||
import traceback
 | 
			
		||||
import typing
 | 
			
		||||
import warnings
 | 
			
		||||
from datetime import datetime, timedelta
 | 
			
		||||
from typing import Callable, Iterable, List, Tuple, TypeVar
 | 
			
		||||
@@ -859,6 +861,32 @@ def elide_list(line_list: List[str], max_num: int = 10) -> List[str]:
 | 
			
		||||
    return line_list
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if sys.version_info >= (3, 9):
 | 
			
		||||
    PatternStr = re.Pattern[str]
 | 
			
		||||
else:
 | 
			
		||||
    PatternStr = typing.Pattern[str]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def fnmatch_translate_multiple(patterns: List[str]) -> Tuple[PatternStr, List[str]]:
 | 
			
		||||
    """Same as fnmatch.translate, but creates a single regex of the form
 | 
			
		||||
    ``(?P<pattern0>...)|(?P<pattern1>...)|...`` for each pattern in the iterable, where
 | 
			
		||||
    ``patternN`` is a named capture group that matches the corresponding pattern translated by
 | 
			
		||||
    ``fnmatch.translate``. This can be used to match multiple patterns in a single pass. No case
 | 
			
		||||
    normalization is performed on the patterns.
 | 
			
		||||
 | 
			
		||||
    Args:
 | 
			
		||||
        patterns: list of fnmatch patterns
 | 
			
		||||
 | 
			
		||||
    Returns:
 | 
			
		||||
        Tuple of the combined regex and the list of named capture groups corresponding to each
 | 
			
		||||
        pattern in the input list.
 | 
			
		||||
    """
 | 
			
		||||
    groups = [f"pattern{i}" for i in range(len(patterns))]
 | 
			
		||||
    regexes = (fnmatch.translate(p) for p in patterns)
 | 
			
		||||
    combined = re.compile("|".join(f"(?P<{g}>{r})" for g, r in zip(groups, regexes)))
 | 
			
		||||
    return combined, groups
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@contextlib.contextmanager
 | 
			
		||||
def nullcontext(*args, **kwargs):
 | 
			
		||||
    """Empty context manager.
 | 
			
		||||
 
 | 
			
		||||
@@ -1072,16 +1072,16 @@ def test_find_max_depth(dir_structure_with_things_to_find):
 | 
			
		||||
    # Make sure the paths we use to verify are absolute
 | 
			
		||||
    assert os.path.isabs(locations["file_one"])
 | 
			
		||||
 | 
			
		||||
    assert set(fs.find_max_depth(root, "file_*", 0)) == {locations["file_four"]}
 | 
			
		||||
    assert set(fs.find_max_depth(root, "file_*", 1)) == {
 | 
			
		||||
    assert set(fs.find(root, "file_*", max_depth=0)) == {locations["file_four"]}
 | 
			
		||||
    assert set(fs.find(root, "file_*", max_depth=1)) == {
 | 
			
		||||
        locations["file_one"],
 | 
			
		||||
        locations["file_three"],
 | 
			
		||||
        locations["file_four"],
 | 
			
		||||
    }
 | 
			
		||||
    assert set(fs.find_max_depth(root, "file_two", 2)) == {locations["file_two"]}
 | 
			
		||||
    assert not set(fs.find_max_depth(root, "file_two", 1))
 | 
			
		||||
    assert set(fs.find_max_depth(root, "file_two")) == {locations["file_two"]}
 | 
			
		||||
    assert set(fs.find_max_depth(root, "file_*")) == set(locations.values())
 | 
			
		||||
    assert set(fs.find(root, "file_two", max_depth=2)) == {locations["file_two"]}
 | 
			
		||||
    assert not set(fs.find(root, "file_two", max_depth=1))
 | 
			
		||||
    assert set(fs.find(root, "file_two")) == {locations["file_two"]}
 | 
			
		||||
    assert set(fs.find(root, "file_*")) == set(locations.values())
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_find_max_depth_relative(dir_structure_with_things_to_find):
 | 
			
		||||
@@ -1090,8 +1090,8 @@ def test_find_max_depth_relative(dir_structure_with_things_to_find):
 | 
			
		||||
    """
 | 
			
		||||
    root, locations = dir_structure_with_things_to_find
 | 
			
		||||
    with fs.working_dir(root):
 | 
			
		||||
        assert set(fs.find_max_depth(".", "file_*", 0)) == {locations["file_four"]}
 | 
			
		||||
        assert set(fs.find_max_depth(".", "file_two", 2)) == {locations["file_two"]}
 | 
			
		||||
        assert set(fs.find(".", "file_*", max_depth=0)) == {locations["file_four"]}
 | 
			
		||||
        assert set(fs.find(".", "file_two", max_depth=2)) == {locations["file_two"]}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize("recursive,max_depth", [(False, -1), (False, 1)])
 | 
			
		||||
@@ -1105,7 +1105,8 @@ def test_max_depth_and_recursive_errors(tmpdir, recursive, max_depth):
 | 
			
		||||
        fs.find_libraries(["some_lib"], root, recursive=recursive, max_depth=max_depth)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def dir_structure_with_things_to_find_links(tmpdir, use_junctions=False):
 | 
			
		||||
@pytest.fixture(params=[True, False])
 | 
			
		||||
def complex_dir_structure(request, tmpdir):
 | 
			
		||||
    """
 | 
			
		||||
    "lx-dy" means "level x, directory y"
 | 
			
		||||
    "lx-fy" means "level x, file y"
 | 
			
		||||
@@ -1128,8 +1129,11 @@ def dir_structure_with_things_to_find_links(tmpdir, use_junctions=False):
 | 
			
		||||
        l1-s3 -> l3-d4 # a link that "skips" a directory level
 | 
			
		||||
        l1-s4 -> l2-s3 # a link to a link to a dir
 | 
			
		||||
    """
 | 
			
		||||
    if sys.platform == "win32" and (not use_junctions) and (not _windows_can_symlink()):
 | 
			
		||||
    use_junctions = request.param
 | 
			
		||||
    if sys.platform == "win32" and not use_junctions and not _windows_can_symlink():
 | 
			
		||||
        pytest.skip("This Windows instance is not configured with symlink support")
 | 
			
		||||
    elif sys.platform != "win32" and use_junctions:
 | 
			
		||||
        pytest.skip("Junctions are a Windows-only feature")
 | 
			
		||||
 | 
			
		||||
    l1_d1 = tmpdir.join("l1-d1").ensure(dir=True)
 | 
			
		||||
    l2_d1 = l1_d1.join("l2-d1").ensure(dir=True)
 | 
			
		||||
@@ -1150,44 +1154,60 @@ def dir_structure_with_things_to_find_links(tmpdir, use_junctions=False):
 | 
			
		||||
    link_fn(l2_d2, l2_s3)
 | 
			
		||||
    link_fn(l2_s3, pathlib.Path(tmpdir) / "l1-s4")
 | 
			
		||||
 | 
			
		||||
    locations = {}
 | 
			
		||||
    locations["l4-f1"] = str(l3_d2.join("l4-f1").ensure())
 | 
			
		||||
    locations["l4-f2-full"] = str(l3_d4.join("l4-f2").ensure())
 | 
			
		||||
    locations["l4-f2-link"] = str(pathlib.Path(tmpdir) / "l1-s3" / "l4-f2")
 | 
			
		||||
    locations["l2-f1"] = str(l1_d2.join("l2-f1").ensure())
 | 
			
		||||
    locations["l2-f1-link"] = str(pathlib.Path(tmpdir) / "l1-d1" / "l2-d1" / "l3-s1" / "l2-f1")
 | 
			
		||||
    locations["l3-f3-full"] = str(l2_d2.join("l3-f3").ensure())
 | 
			
		||||
    locations["l3-f3-link-l1"] = str(pathlib.Path(tmpdir) / "l1-s4" / "l3-f3")
 | 
			
		||||
    locations = {
 | 
			
		||||
        "l4-f1": str(l3_d2.join("l4-f1").ensure()),
 | 
			
		||||
        "l4-f2-full": str(l3_d4.join("l4-f2").ensure()),
 | 
			
		||||
        "l4-f2-link": str(pathlib.Path(tmpdir) / "l1-s3" / "l4-f2"),
 | 
			
		||||
        "l2-f1": str(l1_d2.join("l2-f1").ensure()),
 | 
			
		||||
        "l2-f1-link": str(pathlib.Path(tmpdir) / "l1-d1" / "l2-d1" / "l3-s1" / "l2-f1"),
 | 
			
		||||
        "l3-f3-full": str(l2_d2.join("l3-f3").ensure()),
 | 
			
		||||
        "l3-f3-link-l1": str(pathlib.Path(tmpdir) / "l1-s4" / "l3-f3"),
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return str(tmpdir), locations
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _check_find_links(root, locations):
 | 
			
		||||
def test_find_max_depth_symlinks(complex_dir_structure):
 | 
			
		||||
    root, locations = complex_dir_structure
 | 
			
		||||
    root = pathlib.Path(root)
 | 
			
		||||
    assert set(fs.find_max_depth(root, "l4-f1")) == {locations["l4-f1"]}
 | 
			
		||||
    assert set(fs.find_max_depth(root / "l1-s3", "l4-f2", 0)) == {locations["l4-f2-link"]}
 | 
			
		||||
    assert set(fs.find_max_depth(root / "l1-d1", "l2-f1")) == {locations["l2-f1-link"]}
 | 
			
		||||
    assert set(fs.find(root, "l4-f1")) == {locations["l4-f1"]}
 | 
			
		||||
    assert set(fs.find(root / "l1-s3", "l4-f2", max_depth=0)) == {locations["l4-f2-link"]}
 | 
			
		||||
    assert set(fs.find(root / "l1-d1", "l2-f1")) == {locations["l2-f1-link"]}
 | 
			
		||||
    # File is accessible via symlink and subdir, the link path will be
 | 
			
		||||
    # searched first, and the directory will not be searched again when
 | 
			
		||||
    # it is encountered the second time (via not-link) in the traversal
 | 
			
		||||
    assert set(fs.find_max_depth(root, "l4-f2")) == {locations["l4-f2-link"]}
 | 
			
		||||
    assert set(fs.find(root, "l4-f2")) == {locations["l4-f2-link"]}
 | 
			
		||||
    # File is accessible only via the dir, so the full file path should
 | 
			
		||||
    # be reported
 | 
			
		||||
    assert set(fs.find_max_depth(root / "l1-d1", "l4-f2")) == {locations["l4-f2-full"]}
 | 
			
		||||
    assert set(fs.find(root / "l1-d1", "l4-f2")) == {locations["l4-f2-full"]}
 | 
			
		||||
    # Check following links to links
 | 
			
		||||
    assert set(fs.find_max_depth(root, "l3-f3")) == {locations["l3-f3-link-l1"]}
 | 
			
		||||
    assert set(fs.find(root, "l3-f3")) == {locations["l3-f3-link-l1"]}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    "use_junctions",
 | 
			
		||||
    [
 | 
			
		||||
        False,
 | 
			
		||||
        pytest.param(
 | 
			
		||||
            True,
 | 
			
		||||
            marks=pytest.mark.skipif(sys.platform != "win32", reason="Only Windows has junctions"),
 | 
			
		||||
        ),
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
def test_find_max_depth_symlinks(tmpdir, use_junctions):
 | 
			
		||||
    root, locations = dir_structure_with_things_to_find_links(tmpdir, use_junctions=use_junctions)
 | 
			
		||||
    _check_find_links(root, locations)
 | 
			
		||||
def test_find_max_depth_multiple_and_repeated_entry_points(complex_dir_structure):
 | 
			
		||||
    root, locations = complex_dir_structure
 | 
			
		||||
 | 
			
		||||
    fst = str(pathlib.Path(root) / "l1-d1" / "l2-d1")
 | 
			
		||||
    snd = str(pathlib.Path(root) / "l1-d2")
 | 
			
		||||
    nonexistent = str(pathlib.Path(root) / "nonexistent")
 | 
			
		||||
 | 
			
		||||
    assert set(fs.find([fst, snd, fst, snd, nonexistent], ["l*-f*"], max_depth=1)) == {
 | 
			
		||||
        locations["l2-f1"],
 | 
			
		||||
        locations["l4-f1"],
 | 
			
		||||
        locations["l4-f2-full"],
 | 
			
		||||
        locations["l3-f3-full"],
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_multiple_patterns(complex_dir_structure):
 | 
			
		||||
    root, _ = complex_dir_structure
 | 
			
		||||
    paths = fs.find(root, ["l2-f1", "l3-f3", "*"])
 | 
			
		||||
    # There shouldn't be duplicate results with multiple, overlapping patterns
 | 
			
		||||
    assert len(set(paths)) == len(paths)
 | 
			
		||||
    # All files should be found
 | 
			
		||||
    filenames = [os.path.basename(p) for p in paths]
 | 
			
		||||
    assert set(filenames) == {"l2-f1", "l3-f3", "l4-f1", "l4-f2"}
 | 
			
		||||
    # They are ordered by first matching pattern (this is a bit of an implementation detail,
 | 
			
		||||
    # and we could decide to change the exact order in the future)
 | 
			
		||||
    assert filenames[0] == "l2-f1"
 | 
			
		||||
    assert filenames[1] == "l3-f3"
 | 
			
		||||
 
 | 
			
		||||
@@ -373,3 +373,18 @@ class _SomeClass:
 | 
			
		||||
    _SomeClass.deprecated.error_lvl = 2
 | 
			
		||||
    with pytest.raises(AttributeError):
 | 
			
		||||
        _ = s.deprecated
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_fnmatch_multiple():
 | 
			
		||||
    regex, groups = llnl.util.lang.fnmatch_translate_multiple(["libf*o.so", "libb*r.so"])
 | 
			
		||||
 | 
			
		||||
    a = regex.match("libfoo.so")
 | 
			
		||||
    assert a and a.group(groups[0]) == "libfoo.so"
 | 
			
		||||
 | 
			
		||||
    b = regex.match("libbar.so")
 | 
			
		||||
    assert b and b.group(groups[1]) == "libbar.so"
 | 
			
		||||
 | 
			
		||||
    assert not regex.match("libfoo.so.1")
 | 
			
		||||
    assert not regex.match("libbar.so.1")
 | 
			
		||||
    assert not regex.match("libfoo.solibbar.so")
 | 
			
		||||
    assert not regex.match("libbaz.so")
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user