filesystem.py: add max_depth
argument to find
(#41945)
* `find(..., max_depth=...)` can be used to control how many directories at most to descend into below the starting point * `find` now enters every unique (symlinked) directory once at the lowest depth * `find` is now repeatable: it traverses the directory tree in a deterministic order
This commit is contained in:
@@ -1673,16 +1673,20 @@ def find_first(root: str, files: Union[Iterable[str], str], bfs_depth: int = 2)
|
||||
return FindFirstFile(root, *files, bfs_depth=bfs_depth).find()
|
||||
|
||||
|
||||
def find(root, files, recursive=True):
|
||||
def find(root, files, recursive=True, max_depth: Optional[int] = None):
|
||||
"""Search for ``files`` starting from the ``root`` directory.
|
||||
|
||||
Like GNU/BSD find but written entirely in Python.
|
||||
|
||||
Specifically this behaves like `find -type f`: it only returns
|
||||
results that are files. When searching recursively, this behaves
|
||||
as `find` with the `-L` option (follows symlinks).
|
||||
|
||||
Examples:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
$ find /usr -name python
|
||||
$ find -L /usr -name python
|
||||
|
||||
is equivalent to:
|
||||
|
||||
@@ -1712,6 +1716,8 @@ def find(root, files, recursive=True):
|
||||
files (str or collections.abc.Sequence): Library name(s) to search for
|
||||
recursive (bool): if False search only root folder,
|
||||
if True descends top-down from the root. Defaults to True.
|
||||
max_depth (int): if set, don't search below this depth. Cannot be set
|
||||
if recursive is False
|
||||
|
||||
Returns:
|
||||
list: The files that have been found
|
||||
@@ -1719,59 +1725,135 @@ def find(root, files, recursive=True):
|
||||
if isinstance(files, str):
|
||||
files = [files]
|
||||
|
||||
if recursive:
|
||||
tty.debug(f"Find (recursive): {root} {str(files)}")
|
||||
result = _find_recursive(root, files)
|
||||
else:
|
||||
tty.debug(f"Find (not recursive): {root} {str(files)}")
|
||||
result = _find_non_recursive(root, files)
|
||||
# If recursive is false, max_depth can only be None or 0
|
||||
if max_depth and not recursive:
|
||||
raise ValueError(f"max_depth ({max_depth}) cannot be set if recursive is False")
|
||||
|
||||
if not recursive:
|
||||
max_depth = 0
|
||||
elif max_depth is None:
|
||||
max_depth = sys.maxsize
|
||||
|
||||
tty.debug(f"Find (max depth = {max_depth}): {root} {str(files)}")
|
||||
result = find_max_depth(root, files, max_depth)
|
||||
|
||||
tty.debug(f"Find complete: {root} {str(files)}")
|
||||
return result
|
||||
|
||||
|
||||
@system_path_filter
|
||||
def _find_recursive(root, search_files):
|
||||
# The variable here is **on purpose** a defaultdict. The idea is that
|
||||
# we want to poke the filesystem as little as possible, but still maintain
|
||||
# stability in the order of the answer. Thus we are recording each library
|
||||
# found in a key, and reconstructing the stable order later.
|
||||
found_files = collections.defaultdict(list)
|
||||
@system_path_filter(arg_slice=slice(1))
|
||||
def find_max_depth(root, globs, max_depth: Optional[int] = None):
|
||||
"""Given a set of non-recursive glob file patterns, finds all
|
||||
files matching those patterns up to a maximum specified depth.
|
||||
|
||||
# Make the path absolute to have os.walk also return an absolute path
|
||||
root = os.path.abspath(root)
|
||||
for path, _, list_files in os.walk(root):
|
||||
for search_file in search_files:
|
||||
matches = glob.glob(os.path.join(path, search_file))
|
||||
matches = [os.path.join(path, x) for x in matches]
|
||||
found_files[search_file].extend(matches)
|
||||
If a directory has a name which matches an input pattern, it will
|
||||
not be included in the results.
|
||||
|
||||
answer = []
|
||||
for search_file in search_files:
|
||||
answer.extend(found_files[search_file])
|
||||
If ``max_depth`` is specified, does not search below that depth.
|
||||
|
||||
return answer
|
||||
If ``globs`` is a list, files matching earlier entries are placed
|
||||
in the return value before files matching later entries.
|
||||
"""
|
||||
# If root doesn't exist, then we say we found nothing. If it
|
||||
# exists but is not a dir, we assume the user would want to
|
||||
# know; likewise if it exists but we do not have permission to
|
||||
# access it.
|
||||
try:
|
||||
stat_root = os.stat(root)
|
||||
except OSError as e:
|
||||
if e.errno == errno.ENOENT:
|
||||
return []
|
||||
else:
|
||||
raise
|
||||
if not stat.S_ISDIR(stat_root.st_mode):
|
||||
raise ValueError(f"{root} is not a directory")
|
||||
|
||||
if max_depth is None:
|
||||
max_depth = sys.maxsize
|
||||
|
||||
@system_path_filter
|
||||
def _find_non_recursive(root, search_files):
|
||||
# The variable here is **on purpose** a defaultdict as os.list_dir
|
||||
# can return files in any order (does not preserve stability)
|
||||
found_files = collections.defaultdict(list)
|
||||
if isinstance(globs, str):
|
||||
globs = [globs]
|
||||
# Apply normcase to regular expressions and to the filenames:
|
||||
# this respects case-sensitivity semantics of different OSes
|
||||
# (e.g. file search is typically case-insensitive on Windows)
|
||||
regexes = [re.compile(fnmatch.translate(os.path.normcase(x))) for x in globs]
|
||||
|
||||
# Make the path absolute to have absolute path returned
|
||||
# Note later calls to os.scandir etc. return abspaths if the
|
||||
# input is absolute, see https://docs.python.org/3/library/os.html#os.DirEntry.path
|
||||
root = os.path.abspath(root)
|
||||
|
||||
for search_file in search_files:
|
||||
matches = glob.glob(os.path.join(root, search_file))
|
||||
matches = [os.path.join(root, x) for x in matches]
|
||||
found_files[search_file].extend(matches)
|
||||
found_files = collections.defaultdict(list)
|
||||
|
||||
answer = []
|
||||
for search_file in search_files:
|
||||
answer.extend(found_files[search_file])
|
||||
def _dir_id(stat_info):
|
||||
# Note: on windows, st_ino is the file index and st_dev
|
||||
# is the volume serial number. See
|
||||
# https://github.com/python/cpython/blob/3.9/Python/fileutils.c
|
||||
return (stat_info.st_ino, stat_info.st_dev)
|
||||
|
||||
return answer
|
||||
def _log_file_access_issue(e):
|
||||
errno_name = errno.errorcode.get(e.errno, "UNKNOWN")
|
||||
tty.debug(f"find must skip {dir_entry.path}: {errno_name} {str(e)}")
|
||||
|
||||
visited_dirs = set([_dir_id(stat_root)])
|
||||
|
||||
# Each queue item stores the depth and path
|
||||
# This achieves a consistent traversal order by iterating through
|
||||
# each directory in alphabetical order.
|
||||
# This also traverses in BFS order to ensure finding the shortest
|
||||
# path to any file (or one of the shortest paths, if there are
|
||||
# several - the one returned will be consistent given the prior
|
||||
# point).
|
||||
dir_queue = collections.deque([(0, root)])
|
||||
while dir_queue:
|
||||
depth, next_dir = dir_queue.pop()
|
||||
try:
|
||||
dir_iter = os.scandir(next_dir)
|
||||
except OSError:
|
||||
# Most commonly, this would be a permissions issue, for
|
||||
# example if we are scanning an external directory like /usr
|
||||
continue
|
||||
|
||||
with dir_iter:
|
||||
ordered_entries = sorted(dir_iter, key=lambda x: x.name)
|
||||
for dir_entry in ordered_entries:
|
||||
try:
|
||||
it_is_a_dir = dir_entry.is_dir(follow_symlinks=True)
|
||||
except OSError as e:
|
||||
# Possible permission issue, or a symlink that cannot
|
||||
# be resolved (ELOOP).
|
||||
_log_file_access_issue(e)
|
||||
continue
|
||||
|
||||
if it_is_a_dir and (depth < max_depth):
|
||||
try:
|
||||
# The stat should be performed in a try/except block.
|
||||
# We repeat that here vs. moving to the above block
|
||||
# because we only want to call `stat` if we haven't
|
||||
# exceeded our max_depth
|
||||
if sys.platform == "win32":
|
||||
# Note: st_ino/st_dev on DirEntry.stat are not set on
|
||||
# Windows, so we have to call os.stat
|
||||
stat_info = os.stat(dir_entry.path, follow_symlinks=True)
|
||||
else:
|
||||
stat_info = dir_entry.stat(follow_symlinks=True)
|
||||
except OSError as e:
|
||||
_log_file_access_issue(e)
|
||||
continue
|
||||
|
||||
dir_id = _dir_id(stat_info)
|
||||
if dir_id not in visited_dirs:
|
||||
dir_queue.appendleft((depth + 1, dir_entry.path))
|
||||
visited_dirs.add(dir_id)
|
||||
else:
|
||||
fname = os.path.basename(dir_entry.path)
|
||||
for pattern in regexes:
|
||||
if pattern.match(os.path.normcase(fname)):
|
||||
found_files[pattern].append(os.path.join(next_dir, fname))
|
||||
|
||||
# TODO: for fully-recursive searches, we can print a warning after
|
||||
# after having searched everything up to some fixed depth
|
||||
|
||||
return list(itertools.chain(*[found_files[x] for x in regexes]))
|
||||
|
||||
|
||||
# Utilities for libraries and headers
|
||||
@@ -2210,7 +2292,9 @@ def find_system_libraries(libraries, shared=True):
|
||||
return libraries_found
|
||||
|
||||
|
||||
def find_libraries(libraries, root, shared=True, recursive=False, runtime=True):
|
||||
def find_libraries(
|
||||
libraries, root, shared=True, recursive=False, runtime=True, max_depth: Optional[int] = None
|
||||
):
|
||||
"""Returns an iterable of full paths to libraries found in a root dir.
|
||||
|
||||
Accepts any glob characters accepted by fnmatch:
|
||||
@@ -2231,6 +2315,8 @@ def find_libraries(libraries, root, shared=True, recursive=False, runtime=True):
|
||||
otherwise for static. Defaults to True.
|
||||
recursive (bool): if False search only root folder,
|
||||
if True descends top-down from the root. Defaults to False.
|
||||
max_depth (int): if set, don't search below this depth. Cannot be set
|
||||
if recursive is False
|
||||
runtime (bool): Windows only option, no-op elsewhere. If true,
|
||||
search for runtime shared libs (.DLL), otherwise, search
|
||||
for .Lib files. If shared is false, this has no meaning.
|
||||
@@ -2239,6 +2325,7 @@ def find_libraries(libraries, root, shared=True, recursive=False, runtime=True):
|
||||
Returns:
|
||||
LibraryList: The libraries that have been found
|
||||
"""
|
||||
|
||||
if isinstance(libraries, str):
|
||||
libraries = [libraries]
|
||||
elif not isinstance(libraries, collections.abc.Sequence):
|
||||
@@ -2271,8 +2358,10 @@ def find_libraries(libraries, root, shared=True, recursive=False, runtime=True):
|
||||
libraries = ["{0}.{1}".format(lib, suffix) for lib in libraries for suffix in suffixes]
|
||||
|
||||
if not recursive:
|
||||
if max_depth:
|
||||
raise ValueError(f"max_depth ({max_depth}) cannot be set if recursive is False")
|
||||
# If not recursive, look for the libraries directly in root
|
||||
return LibraryList(find(root, libraries, False))
|
||||
return LibraryList(find(root, libraries, recursive=False))
|
||||
|
||||
# To speedup the search for external packages configured e.g. in /usr,
|
||||
# perform first non-recursive search in root/lib then in root/lib64 and
|
||||
@@ -2290,7 +2379,7 @@ def find_libraries(libraries, root, shared=True, recursive=False, runtime=True):
|
||||
if found_libs:
|
||||
break
|
||||
else:
|
||||
found_libs = find(root, libraries, True)
|
||||
found_libs = find(root, libraries, recursive=True, max_depth=max_depth)
|
||||
|
||||
return LibraryList(found_libs)
|
||||
|
||||
|
Reference in New Issue
Block a user