Add llnl.util.filesystem.find_first (#36083)

Add a `find_first` method that locates one instance of a file
that matches a specified pattern by recursively searching a directory
tree. Unlike other `find` methods, this only locates one file at most,
so can use optimizations that avoid searching the entire tree:
Typically the relevant files are at low depth, so it makes sense to
locate files through iterative deepening and early exit.
This commit is contained in:
Harmen Stoppels 2023-03-27 18:42:16 +02:00 committed by GitHub
parent c5b3fc6929
commit 5072e48dab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 168 additions and 1 deletions

View File

@ -5,18 +5,20 @@
import collections
import collections.abc
import errno
import fnmatch
import glob
import hashlib
import itertools
import numbers
import os
import posixpath
import re
import shutil
import stat
import sys
import tempfile
from contextlib import contextmanager
from typing import Callable, List, Match, Optional, Tuple, Union
from typing import Callable, Iterable, List, Match, Optional, Tuple, Union
from llnl.util import tty
from llnl.util.lang import dedupe, memoized
@ -1671,6 +1673,38 @@ def fix_darwin_install_name(path):
break
def find_first(root: str, files: Union[Iterable[str], str], bfs_depth: int = 2) -> Optional[str]:
"""Find the first file matching a pattern.
The following
.. code-block:: console
$ find /usr -name 'abc*' -o -name 'def*' -quit
is equivalent to:
>>> find_first("/usr", ["abc*", "def*"])
Any glob pattern supported by fnmatch can be used.
The search order of this method is breadth-first over directories,
until depth bfs_depth, after which depth-first search is used.
Parameters:
root (str): The root directory to start searching from
files (str or Iterable): File pattern(s) to search for
bfs_depth (int): (advanced) parameter that specifies at which
depth to switch to depth-first search.
Returns:
str or None: The matching file or None when no file is found.
"""
if isinstance(files, str):
files = [files]
return FindFirstFile(root, *files, bfs_depth=bfs_depth).find()
def find(root, files, recursive=True):
"""Search for ``files`` starting from the ``root`` directory.
@ -2720,3 +2754,105 @@ def filesummary(path, print_bytes=16) -> Tuple[int, bytes]:
return size, short_contents
except OSError:
return 0, b""
class FindFirstFile:
"""Uses hybrid iterative deepening to locate the first matching
file. Up to depth ``bfs_depth`` it uses iterative deepening, which
mimics breadth-first with the same memory footprint as depth-first
search, after which it switches to ordinary depth-first search using
``os.walk``."""
def __init__(self, root: str, *file_patterns: str, bfs_depth: int = 2):
"""Create a small summary of the given file. Does not error
when file does not exist.
Args:
root (str): directory in which to recursively search
file_patterns (str): glob file patterns understood by fnmatch
bfs_depth (int): until this depth breadth-first traversal is used,
when no match is found, the mode is switched to depth-first search.
"""
self.root = root
self.bfs_depth = bfs_depth
self.match: Callable
# normcase is trivial on posix
regex = re.compile("|".join(fnmatch.translate(os.path.normcase(p)) for p in file_patterns))
# On case sensitive filesystems match against normcase'd paths.
if os.path is posixpath:
self.match = regex.match
else:
self.match = lambda p: regex.match(os.path.normcase(p))
def find(self) -> Optional[str]:
"""Run the file search
Returns:
str or None: path of the matching file
"""
self.file = None
# First do iterative deepening (i.e. bfs through limited depth dfs)
for i in range(self.bfs_depth + 1):
if self._find_at_depth(self.root, i):
return self.file
# Then fall back to depth-first search
return self._find_dfs()
def _find_at_depth(self, path, max_depth, depth=0) -> bool:
"""Returns True when done. Notice search can be done
either because a file was found, or because it recursed
through all directories."""
try:
entries = os.scandir(path)
except OSError:
return True
done = True
with entries:
# At max depth we look for matching files.
if depth == max_depth:
for f in entries:
# Exit on match
if self.match(f.name):
self.file = os.path.join(path, f.name)
return True
# is_dir should not require a stat call, so it's a good optimization.
if self._is_dir(f):
done = False
return done
# At lower depth only recurse into subdirs
for f in entries:
if not self._is_dir(f):
continue
# If any subdir is not fully traversed, we're not done yet.
if not self._find_at_depth(os.path.join(path, f.name), max_depth, depth + 1):
done = False
# Early exit when we've found something.
if self.file:
return True
return done
def _is_dir(self, f: os.DirEntry) -> bool:
"""Returns True when f is dir we can enter (and not a symlink)."""
try:
return f.is_dir(follow_symlinks=False)
except OSError:
return False
def _find_dfs(self) -> Optional[str]:
"""Returns match or None"""
for dirpath, _, filenames in os.walk(self.root):
for file in filenames:
if self.match(file):
return os.path.join(dirpath, file)
return None

View File

@ -871,3 +871,34 @@ def test_filesummary(tmpdir):
assert fs.filesummary(p, print_bytes=8) == (26, b"abcdefgh...stuvwxyz")
assert fs.filesummary(p, print_bytes=13) == (26, b"abcdefghijklmnopqrstuvwxyz")
assert fs.filesummary(p, print_bytes=100) == (26, b"abcdefghijklmnopqrstuvwxyz")
@pytest.mark.parametrize("bfs_depth", [1, 2, 10])
def test_find_first_file(tmpdir, bfs_depth):
# Create a structure: a/a/a/{file1,file2}, b/a, c/a, d/{a,file1}
tmpdir.join("a", "a", "a").ensure(dir=True)
tmpdir.join("b", "a").ensure(dir=True)
tmpdir.join("c", "a").ensure(dir=True)
tmpdir.join("d", "a").ensure(dir=True)
tmpdir.join("e").ensure(dir=True)
fs.touch(tmpdir.join("a", "a", "a", "file1"))
fs.touch(tmpdir.join("a", "a", "a", "file2"))
fs.touch(tmpdir.join("d", "file1"))
root = str(tmpdir)
# Iterative deepening: should find low-depth file1.
assert os.path.samefile(
fs.find_first(root, "file*", bfs_depth=bfs_depth), os.path.join(root, "d", "file1")
)
assert fs.find_first(root, "nonexisting", bfs_depth=bfs_depth) is None
assert os.path.samefile(
fs.find_first(root, ["nonexisting", "file2"], bfs_depth=bfs_depth),
os.path.join(root, "a", "a", "a", "file2"),
)
# Should find first dir
assert os.path.samefile(fs.find_first(root, "a", bfs_depth=bfs_depth), os.path.join(root, "a"))