Revert "llnl.util.filesystem.find: multiple entrypoints (#47436)"

This reverts commit 73219e4b02.
This commit is contained in:
Harmen Stoppels 2024-11-07 20:53:26 +01:00 committed by Peter Scheibel
parent 0a4563fd02
commit 60ba61f6b2
4 changed files with 130 additions and 178 deletions

View File

@ -20,11 +20,11 @@
import tempfile
from contextlib import contextmanager
from itertools import accumulate
from typing import Callable, Deque, Dict, Iterable, List, Match, Optional, Set, Tuple, Union
from typing import Callable, Iterable, List, Match, Optional, Tuple, Union
import llnl.util.symlink
from llnl.util import tty
from llnl.util.lang import dedupe, fnmatch_translate_multiple, memoized
from llnl.util.lang import dedupe, memoized
from llnl.util.symlink import islink, readlink, resolve_link_target_relative_to_the_link, symlink
from ..path import path_to_os_path, system_path_filter
@ -1673,40 +1673,32 @@ def find_first(root: str, files: Union[Iterable[str], str], bfs_depth: int = 2)
return FindFirstFile(root, *files, bfs_depth=bfs_depth).find()
def find(
root: Union[str, List[str]],
files: Union[str, List[str]],
recursive: bool = True,
max_depth: Optional[int] = None,
) -> List[str]:
"""Finds all non-directory files matching the filename patterns from ``files`` starting from
``root``. This function returns a deterministic result for the same input and directory
structure when run multiple times. Symlinked directories are followed, and unique directories
are searched only once. Each matching file is returned only once at lowest depth in case
multiple paths exist due to symlinked directories. The function has similarities to the Unix
``find`` utility.
def find(root, files, recursive=True, max_depth: Optional[int] = None):
"""Search for ``files`` starting from the ``root`` directory.
Like GNU/BSD find but written entirely in Python.
Specifically this behaves like `find -type f`: it only returns
results that are files. When searching recursively, this behaves
as `find` with the `-L` option (follows symlinks).
Examples:
.. code-block:: console
$ find -L /usr -name python3 -type f
$ find -L /usr -name python
is roughly equivalent to
is equivalent to:
>>> find("/usr", "python3")
with the notable difference that this function only lists a single path to each file in case of
symlinked directories.
>>> find('/usr', 'python')
.. code-block:: console
$ find -L /usr/local/bin /usr/local/sbin -maxdepth 1 '(' -name python3 -o -name getcap \\
')' -type f
$ find /usr/local/bin -maxdepth 1 -name python
is roughly equivalent to:
is equivalent to:
>>> find(["/usr/local/bin", "/usr/local/sbin"], ["python3", "getcap"], recursive=False)
>>> find('/usr/local/bin', 'python', recursive=False)
Accepts any glob characters accepted by fnmatch:
@ -1720,17 +1712,17 @@ def find(
========== ====================================
Parameters:
root: One or more root directories to start searching from
files: One or more filename patterns to search for
recursive: if False search only root, if True descends from roots. Defaults to True.
max_depth: if set, don't search below this depth. Cannot be set if recursive is False
root (str): The root directory to start searching from
files (str or collections.abc.Sequence): Library name(s) to search for
recursive (bool): if False search only root folder,
if True descends top-down from the root. Defaults to True.
max_depth (int): if set, don't search below this depth. Cannot be set
if recursive is False
Returns a list of absolute, matching file paths.
Returns:
list: The files that have been found
"""
if not isinstance(root, list):
root = [root]
if not isinstance(files, list):
if isinstance(files, str):
files = [files]
# If recursive is false, max_depth can only be None or 0
@ -1742,9 +1734,10 @@ def find(
elif max_depth is None:
max_depth = sys.maxsize
tty.debug(f"Find (max depth = {max_depth}): {root} {files}")
result = _find_max_depth(root, files, max_depth)
tty.debug(f"Find complete: {root} {files}")
tty.debug(f"Find (max depth = {max_depth}): {root} {str(files)}")
result = find_max_depth(root, files, max_depth)
tty.debug(f"Find complete: {root} {str(files)}")
return result
@ -1753,36 +1746,56 @@ def _log_file_access_issue(e: OSError, path: str) -> None:
tty.debug(f"find must skip {path}: {errno_name} {e}")
def _dir_id(s: os.stat_result) -> Tuple[int, int]:
# Note: on windows, st_ino is the file index and st_dev is the volume serial number. See
# https://github.com/python/cpython/blob/3.9/Python/fileutils.c
return (s.st_ino, s.st_dev)
@system_path_filter(arg_slice=slice(1))
def find_max_depth(root, globs, max_depth: Optional[int] = None):
"""Given a set of non-recursive glob file patterns, finds all
files matching those patterns up to a maximum specified depth.
If a directory has a name which matches an input pattern, it will
not be included in the results.
def _find_max_depth(roots: List[str], globs: List[str], max_depth: int = sys.maxsize) -> List[str]:
"""See ``find`` for the public API."""
# Apply normcase to file patterns and filenames to respect case insensitive filesystems
regex, groups = fnmatch_translate_multiple([os.path.normcase(x) for x in globs])
# Ordered dictionary that keeps track of the files found for each pattern
capture_group_to_paths: Dict[str, List[str]] = {group: [] for group in groups}
# Ensure returned paths are always absolute
roots = [os.path.abspath(r) for r in roots]
# Breadth-first search queue. Each element is a tuple of (depth, directory)
dir_queue: Deque[Tuple[int, str]] = collections.deque()
# Set of visited directories. Each element is a tuple of (inode, device)
visited_dirs: Set[Tuple[int, int]] = set()
If ``max_depth`` is specified, does not search below that depth.
for root in roots:
try:
stat_root = os.stat(root)
except OSError as e:
_log_file_access_issue(e, root)
continue
dir_id = _dir_id(stat_root)
if dir_id not in visited_dirs:
dir_queue.appendleft((0, root))
visited_dirs.add(dir_id)
If ``globs`` is a list, files matching earlier entries are placed
in the return value before files matching later entries.
"""
try:
stat_root = os.stat(root)
except OSError:
return []
if max_depth is None:
max_depth = sys.maxsize
if isinstance(globs, str):
globs = [globs]
# Apply normcase to regular expressions and to the filenames:
# this respects case-sensitivity semantics of different OSes
# (e.g. file search is typically case-insensitive on Windows)
regexes = [re.compile(fnmatch.translate(os.path.normcase(x))) for x in globs]
# Note later calls to os.scandir etc. return abspaths if the
# input is absolute, see https://docs.python.org/3/library/os.html#os.DirEntry.path
root = os.path.abspath(root)
found_files = collections.defaultdict(list)
def _dir_id(stat_info):
# Note: on windows, st_ino is the file index and st_dev
# is the volume serial number. See
# https://github.com/python/cpython/blob/3.9/Python/fileutils.c
return (stat_info.st_ino, stat_info.st_dev)
visited_dirs = set([_dir_id(stat_root)])
# Each queue item stores the depth and path
# This achieves a consistent traversal order by iterating through
# each directory in alphabetical order.
# This also traverses in BFS order to ensure finding the shortest
# path to any file (or one of the shortest paths, if there are
# several - the one returned will be consistent given the prior
# point).
dir_queue = collections.deque([(0, root)])
while dir_queue:
depth, next_dir = dir_queue.pop()
try:
@ -1797,18 +1810,20 @@ def _find_max_depth(roots: List[str], globs: List[str], max_depth: int = sys.max
try:
it_is_a_dir = dir_entry.is_dir(follow_symlinks=True)
except OSError as e:
# Possible permission issue, or a symlink that cannot be resolved (ELOOP).
# Possible permission issue, or a symlink that cannot
# be resolved (ELOOP).
_log_file_access_issue(e, dir_entry.path)
continue
if it_is_a_dir and depth < max_depth:
if it_is_a_dir and (depth < max_depth):
try:
# The stat should be performed in a try/except block. We repeat that here
# vs. moving to the above block because we only want to call `stat` if we
# haven't exceeded our max_depth
# The stat should be performed in a try/except block.
# We repeat that here vs. moving to the above block
# because we only want to call `stat` if we haven't
# exceeded our max_depth
if sys.platform == "win32":
# Note: st_ino/st_dev on DirEntry.stat are not set on Windows, so we
# have to call os.stat
# Note: st_ino/st_dev on DirEntry.stat are not set on
# Windows, so we have to call os.stat
stat_info = os.stat(dir_entry.path, follow_symlinks=True)
else:
stat_info = dir_entry.stat(follow_symlinks=True)
@ -1821,15 +1836,15 @@ def _find_max_depth(roots: List[str], globs: List[str], max_depth: int = sys.max
dir_queue.appendleft((depth + 1, dir_entry.path))
visited_dirs.add(dir_id)
else:
m = regex.match(os.path.normcase(os.path.basename(dir_entry.path)))
if not m:
continue
for group in capture_group_to_paths:
if m.group(group):
capture_group_to_paths[group].append(dir_entry.path)
break
fname = os.path.basename(dir_entry.path)
for pattern in regexes:
if pattern.match(os.path.normcase(fname)):
found_files[pattern].append(os.path.join(next_dir, fname))
return [path for paths in capture_group_to_paths.values() for path in paths]
# TODO: for fully-recursive searches, we can print a warning after
# after having searched everything up to some fixed depth
return list(itertools.chain(*[found_files[x] for x in regexes]))
# Utilities for libraries and headers

View File

@ -5,14 +5,12 @@
import collections.abc
import contextlib
import fnmatch
import functools
import itertools
import os
import re
import sys
import traceback
import typing
import warnings
from datetime import datetime, timedelta
from typing import Callable, Iterable, List, Tuple, TypeVar
@ -861,32 +859,6 @@ def elide_list(line_list: List[str], max_num: int = 10) -> List[str]:
return line_list
if sys.version_info >= (3, 9):
PatternStr = re.Pattern[str]
else:
PatternStr = typing.Pattern[str]
def fnmatch_translate_multiple(patterns: List[str]) -> Tuple[PatternStr, List[str]]:
"""Same as fnmatch.translate, but creates a single regex of the form
``(?P<pattern0>...)|(?P<pattern1>...)|...`` for each pattern in the iterable, where
``patternN`` is a named capture group that matches the corresponding pattern translated by
``fnmatch.translate``. This can be used to match multiple patterns in a single pass. No case
normalization is performed on the patterns.
Args:
patterns: list of fnmatch patterns
Returns:
Tuple of the combined regex and the list of named capture groups corresponding to each
pattern in the input list.
"""
groups = [f"pattern{i}" for i in range(len(patterns))]
regexes = (fnmatch.translate(p) for p in patterns)
combined = re.compile("|".join(f"(?P<{g}>{r})" for g, r in zip(groups, regexes)))
return combined, groups
@contextlib.contextmanager
def nullcontext(*args, **kwargs):
"""Empty context manager.

View File

@ -1072,16 +1072,16 @@ def test_find_max_depth(dir_structure_with_things_to_find):
# Make sure the paths we use to verify are absolute
assert os.path.isabs(locations["file_one"])
assert set(fs.find(root, "file_*", max_depth=0)) == {locations["file_four"]}
assert set(fs.find(root, "file_*", max_depth=1)) == {
assert set(fs.find_max_depth(root, "file_*", 0)) == {locations["file_four"]}
assert set(fs.find_max_depth(root, "file_*", 1)) == {
locations["file_one"],
locations["file_three"],
locations["file_four"],
}
assert set(fs.find(root, "file_two", max_depth=2)) == {locations["file_two"]}
assert not set(fs.find(root, "file_two", max_depth=1))
assert set(fs.find(root, "file_two")) == {locations["file_two"]}
assert set(fs.find(root, "file_*")) == set(locations.values())
assert set(fs.find_max_depth(root, "file_two", 2)) == {locations["file_two"]}
assert not set(fs.find_max_depth(root, "file_two", 1))
assert set(fs.find_max_depth(root, "file_two")) == {locations["file_two"]}
assert set(fs.find_max_depth(root, "file_*")) == set(locations.values())
def test_find_max_depth_relative(dir_structure_with_things_to_find):
@ -1090,8 +1090,8 @@ def test_find_max_depth_relative(dir_structure_with_things_to_find):
"""
root, locations = dir_structure_with_things_to_find
with fs.working_dir(root):
assert set(fs.find(".", "file_*", max_depth=0)) == {locations["file_four"]}
assert set(fs.find(".", "file_two", max_depth=2)) == {locations["file_two"]}
assert set(fs.find_max_depth(".", "file_*", 0)) == {locations["file_four"]}
assert set(fs.find_max_depth(".", "file_two", 2)) == {locations["file_two"]}
@pytest.mark.parametrize("recursive,max_depth", [(False, -1), (False, 1)])
@ -1105,8 +1105,7 @@ def test_max_depth_and_recursive_errors(tmpdir, recursive, max_depth):
fs.find_libraries(["some_lib"], root, recursive=recursive, max_depth=max_depth)
@pytest.fixture(params=[True, False])
def complex_dir_structure(request, tmpdir):
def dir_structure_with_things_to_find_links(tmpdir, use_junctions=False):
"""
"lx-dy" means "level x, directory y"
"lx-fy" means "level x, file y"
@ -1129,11 +1128,8 @@ def complex_dir_structure(request, tmpdir):
l1-s3 -> l3-d4 # a link that "skips" a directory level
l1-s4 -> l2-s3 # a link to a link to a dir
"""
use_junctions = request.param
if sys.platform == "win32" and not use_junctions and not _windows_can_symlink():
if sys.platform == "win32" and (not use_junctions) and (not _windows_can_symlink()):
pytest.skip("This Windows instance is not configured with symlink support")
elif sys.platform != "win32" and use_junctions:
pytest.skip("Junctions are a Windows-only feature")
l1_d1 = tmpdir.join("l1-d1").ensure(dir=True)
l2_d1 = l1_d1.join("l2-d1").ensure(dir=True)
@ -1154,60 +1150,44 @@ def complex_dir_structure(request, tmpdir):
link_fn(l2_d2, l2_s3)
link_fn(l2_s3, pathlib.Path(tmpdir) / "l1-s4")
locations = {
"l4-f1": str(l3_d2.join("l4-f1").ensure()),
"l4-f2-full": str(l3_d4.join("l4-f2").ensure()),
"l4-f2-link": str(pathlib.Path(tmpdir) / "l1-s3" / "l4-f2"),
"l2-f1": str(l1_d2.join("l2-f1").ensure()),
"l2-f1-link": str(pathlib.Path(tmpdir) / "l1-d1" / "l2-d1" / "l3-s1" / "l2-f1"),
"l3-f3-full": str(l2_d2.join("l3-f3").ensure()),
"l3-f3-link-l1": str(pathlib.Path(tmpdir) / "l1-s4" / "l3-f3"),
}
locations = {}
locations["l4-f1"] = str(l3_d2.join("l4-f1").ensure())
locations["l4-f2-full"] = str(l3_d4.join("l4-f2").ensure())
locations["l4-f2-link"] = str(pathlib.Path(tmpdir) / "l1-s3" / "l4-f2")
locations["l2-f1"] = str(l1_d2.join("l2-f1").ensure())
locations["l2-f1-link"] = str(pathlib.Path(tmpdir) / "l1-d1" / "l2-d1" / "l3-s1" / "l2-f1")
locations["l3-f3-full"] = str(l2_d2.join("l3-f3").ensure())
locations["l3-f3-link-l1"] = str(pathlib.Path(tmpdir) / "l1-s4" / "l3-f3")
return str(tmpdir), locations
def test_find_max_depth_symlinks(complex_dir_structure):
root, locations = complex_dir_structure
def _check_find_links(root, locations):
root = pathlib.Path(root)
assert set(fs.find(root, "l4-f1")) == {locations["l4-f1"]}
assert set(fs.find(root / "l1-s3", "l4-f2", max_depth=0)) == {locations["l4-f2-link"]}
assert set(fs.find(root / "l1-d1", "l2-f1")) == {locations["l2-f1-link"]}
assert set(fs.find_max_depth(root, "l4-f1")) == {locations["l4-f1"]}
assert set(fs.find_max_depth(root / "l1-s3", "l4-f2", 0)) == {locations["l4-f2-link"]}
assert set(fs.find_max_depth(root / "l1-d1", "l2-f1")) == {locations["l2-f1-link"]}
# File is accessible via symlink and subdir, the link path will be
# searched first, and the directory will not be searched again when
# it is encountered the second time (via not-link) in the traversal
assert set(fs.find(root, "l4-f2")) == {locations["l4-f2-link"]}
assert set(fs.find_max_depth(root, "l4-f2")) == {locations["l4-f2-link"]}
# File is accessible only via the dir, so the full file path should
# be reported
assert set(fs.find(root / "l1-d1", "l4-f2")) == {locations["l4-f2-full"]}
assert set(fs.find_max_depth(root / "l1-d1", "l4-f2")) == {locations["l4-f2-full"]}
# Check following links to links
assert set(fs.find(root, "l3-f3")) == {locations["l3-f3-link-l1"]}
assert set(fs.find_max_depth(root, "l3-f3")) == {locations["l3-f3-link-l1"]}
def test_find_max_depth_multiple_and_repeated_entry_points(complex_dir_structure):
root, locations = complex_dir_structure
fst = str(pathlib.Path(root) / "l1-d1" / "l2-d1")
snd = str(pathlib.Path(root) / "l1-d2")
nonexistent = str(pathlib.Path(root) / "nonexistent")
assert set(fs.find([fst, snd, fst, snd, nonexistent], ["l*-f*"], max_depth=1)) == {
locations["l2-f1"],
locations["l4-f1"],
locations["l4-f2-full"],
locations["l3-f3-full"],
}
def test_multiple_patterns(complex_dir_structure):
root, _ = complex_dir_structure
paths = fs.find(root, ["l2-f1", "l3-f3", "*"])
# There shouldn't be duplicate results with multiple, overlapping patterns
assert len(set(paths)) == len(paths)
# All files should be found
filenames = [os.path.basename(p) for p in paths]
assert set(filenames) == {"l2-f1", "l3-f3", "l4-f1", "l4-f2"}
# They are ordered by first matching pattern (this is a bit of an implementation detail,
# and we could decide to change the exact order in the future)
assert filenames[0] == "l2-f1"
assert filenames[1] == "l3-f3"
@pytest.mark.parametrize(
"use_junctions",
[
False,
pytest.param(
True,
marks=pytest.mark.skipif(sys.platform != "win32", reason="Only Windows has junctions"),
),
],
)
def test_find_max_depth_symlinks(tmpdir, use_junctions):
root, locations = dir_structure_with_things_to_find_links(tmpdir, use_junctions=use_junctions)
_check_find_links(root, locations)

View File

@ -373,18 +373,3 @@ class _SomeClass:
_SomeClass.deprecated.error_lvl = 2
with pytest.raises(AttributeError):
_ = s.deprecated
def test_fnmatch_multiple():
regex, groups = llnl.util.lang.fnmatch_translate_multiple(["libf*o.so", "libb*r.so"])
a = regex.match("libfoo.so")
assert a and a.group(groups[0]) == "libfoo.so"
b = regex.match("libbar.so")
assert b and b.group(groups[1]) == "libbar.so"
assert not regex.match("libfoo.so.1")
assert not regex.match("libbar.so.1")
assert not regex.match("libfoo.solibbar.so")
assert not regex.match("libbaz.so")