filesystem: cleanup (#42342)

Type hints and removal of unused code
This commit is contained in:
Harmen Stoppels 2024-01-29 15:43:17 +01:00 committed by GitHub
parent 7ec93a496d
commit 0718e3459a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 116 additions and 253 deletions

View File

@ -1377,120 +1377,89 @@ def traverse_tree(
yield (source_path, dest_path)
def lexists_islink_isdir(path):
"""Computes the tuple (lexists(path), islink(path), isdir(path)) in a minimal
number of stat calls on unix. Use os.path and symlink.islink methods for windows."""
if sys.platform == "win32":
if not os.path.lexists(path):
return False, False, False
return os.path.lexists(path), islink(path), os.path.isdir(path)
# First try to lstat, so we know if it's a link or not.
try:
lst = os.lstat(path)
except (IOError, OSError):
return False, False, False
is_link = stat.S_ISLNK(lst.st_mode)
# Check whether file is a dir.
if not is_link:
is_dir = stat.S_ISDIR(lst.st_mode)
return True, is_link, is_dir
# Check whether symlink points to a dir.
try:
st = os.stat(path)
is_dir = stat.S_ISDIR(st.st_mode)
except (IOError, OSError):
# Dangling symlink (i.e. it lexists but not exists)
is_dir = False
return True, is_link, is_dir
class BaseDirectoryVisitor:
"""Base class and interface for :py:func:`visit_directory_tree`."""
def visit_file(self, root, rel_path, depth):
def visit_file(self, root: str, rel_path: str, depth: int) -> None:
"""Handle the non-symlink file at ``os.path.join(root, rel_path)``
Parameters:
root (str): root directory
rel_path (str): relative path to current file from ``root``
root: root directory
rel_path: relative path to current file from ``root``
depth (int): depth of current file from the ``root`` directory"""
pass
def visit_symlinked_file(self, root, rel_path, depth):
"""Handle the symlink to a file at ``os.path.join(root, rel_path)``.
Note: ``rel_path`` is the location of the symlink, not to what it is
pointing to. The symlink may be dangling.
def visit_symlinked_file(self, root: str, rel_path: str, depth) -> None:
"""Handle the symlink to a file at ``os.path.join(root, rel_path)``. Note: ``rel_path`` is
the location of the symlink, not to what it is pointing to. The symlink may be dangling.
Parameters:
root (str): root directory
rel_path (str): relative path to current symlink from ``root``
depth (int): depth of current symlink from the ``root`` directory"""
root: root directory
rel_path: relative path to current symlink from ``root``
depth: depth of current symlink from the ``root`` directory"""
pass
def before_visit_dir(self, root, rel_path, depth):
def before_visit_dir(self, root: str, rel_path: str, depth: int) -> bool:
"""Return True from this function to recurse into the directory at
os.path.join(root, rel_path). Return False in order not to recurse further.
Parameters:
root (str): root directory
rel_path (str): relative path to current directory from ``root``
depth (int): depth of current directory from the ``root`` directory
root: root directory
rel_path: relative path to current directory from ``root``
depth: depth of current directory from the ``root`` directory
Returns:
bool: ``True`` when the directory should be recursed into. ``False`` when
not"""
return False
def before_visit_symlinked_dir(self, root, rel_path, depth):
"""Return ``True`` to recurse into the symlinked directory and ``False`` in
order not to. Note: ``rel_path`` is the path to the symlink itself.
Following symlinked directories blindly can cause infinite recursion due to
cycles.
def before_visit_symlinked_dir(self, root: str, rel_path: str, depth: int) -> bool:
"""Return ``True`` to recurse into the symlinked directory and ``False`` in order not to.
Note: ``rel_path`` is the path to the symlink itself. Following symlinked directories
blindly can cause infinite recursion due to cycles.
Parameters:
root (str): root directory
rel_path (str): relative path to current symlink from ``root``
depth (int): depth of current symlink from the ``root`` directory
root: root directory
rel_path: relative path to current symlink from ``root``
depth: depth of current symlink from the ``root`` directory
Returns:
bool: ``True`` when the directory should be recursed into. ``False`` when
not"""
return False
def after_visit_dir(self, root, rel_path, depth):
"""Called after recursion into ``rel_path`` finished. This function is not
called when ``rel_path`` was not recursed into.
def after_visit_dir(self, root: str, rel_path: str, depth: int) -> None:
"""Called after recursion into ``rel_path`` finished. This function is not called when
``rel_path`` was not recursed into.
Parameters:
root (str): root directory
rel_path (str): relative path to current directory from ``root``
depth (int): depth of current directory from the ``root`` directory"""
root: root directory
rel_path: relative path to current directory from ``root``
depth: depth of current directory from the ``root`` directory"""
pass
def after_visit_symlinked_dir(self, root, rel_path, depth):
"""Called after recursion into ``rel_path`` finished. This function is not
called when ``rel_path`` was not recursed into.
def after_visit_symlinked_dir(self, root: str, rel_path: str, depth: int) -> None:
"""Called after recursion into ``rel_path`` finished. This function is not called when
``rel_path`` was not recursed into.
Parameters:
root (str): root directory
rel_path (str): relative path to current symlink from ``root``
depth (int): depth of current symlink from the ``root`` directory"""
root: root directory
rel_path: relative path to current symlink from ``root``
depth: depth of current symlink from the ``root`` directory"""
pass
def visit_directory_tree(root, visitor, rel_path="", depth=0):
"""Recurses the directory root depth-first through a visitor pattern using the
interface from :py:class:`BaseDirectoryVisitor`
def visit_directory_tree(
root: str, visitor: BaseDirectoryVisitor, rel_path: str = "", depth: int = 0
):
"""Recurses the directory root depth-first through a visitor pattern using the interface from
:py:class:`BaseDirectoryVisitor`
Parameters:
root (str): path of directory to recurse into
visitor (BaseDirectoryVisitor): what visitor to use
rel_path (str): current relative path from the root
depth (str): current depth from the root
root: path of directory to recurse into
visitor: what visitor to use
rel_path: current relative path from the root
depth: current depth from the root
"""
dir = os.path.join(root, rel_path)
dir_entries = sorted(os.scandir(dir), key=lambda d: d.name)
@ -1498,26 +1467,19 @@ def visit_directory_tree(root, visitor, rel_path="", depth=0):
for f in dir_entries:
rel_child = os.path.join(rel_path, f.name)
islink = f.is_symlink()
# On Windows, symlinks to directories are distinct from
# symlinks to files, and it is possible to create a
# broken symlink to a directory (e.g. using os.symlink
# without `target_is_directory=True`), invoking `isdir`
# on a symlink on Windows that is broken in this manner
# will result in an error. In this case we can work around
# the issue by reading the target and resolving the
# directory ourselves
# On Windows, symlinks to directories are distinct from symlinks to files, and it is
# possible to create a broken symlink to a directory (e.g. using os.symlink without
# `target_is_directory=True`), invoking `isdir` on a symlink on Windows that is broken in
# this manner will result in an error. In this case we can work around the issue by reading
# the target and resolving the directory ourselves
try:
isdir = f.is_dir()
except OSError as e:
if sys.platform == "win32" and hasattr(e, "winerror") and e.winerror == 5 and islink:
# if path is a symlink, determine destination and
# evaluate file vs directory
# if path is a symlink, determine destination and evaluate file vs directory
link_target = resolve_link_target_relative_to_the_link(f)
# link_target might be relative but
# resolve_link_target_relative_to_the_link
# will ensure that if so, that it is relative
# to the CWD and therefore
# makes sense
# link_target might be relative but resolve_link_target_relative_to_the_link
# will ensure that if so, that it is relative to the CWD and therefore makes sense
isdir = os.path.isdir(link_target)
else:
raise e

View File

@ -8,7 +8,7 @@
import filecmp
import os
import shutil
from collections import OrderedDict
from typing import Callable, Dict, List, Optional, Tuple
import llnl.util.tty as tty
from llnl.util.filesystem import BaseDirectoryVisitor, mkdirp, touch, traverse_tree
@ -51,32 +51,30 @@ class SourceMergeVisitor(BaseDirectoryVisitor):
- A list of merge conflicts in dst/
"""
def __init__(self, ignore=None):
def __init__(self, ignore: Optional[Callable[[str], bool]] = None):
self.ignore = ignore if ignore is not None else lambda f: False
# When mapping <src root> to <dst root>/<projection>, we need
# to prepend the <projection> bit to the relative path in the
# destination dir.
self.projection = ""
# When mapping <src root> to <dst root>/<projection>, we need to prepend the <projection>
# bit to the relative path in the destination dir.
self.projection: str = ""
# When a file blocks another file, the conflict can sometimes
# be resolved / ignored (e.g. <prefix>/LICENSE or
# or <site-packages>/<namespace>/__init__.py conflicts can be
# When a file blocks another file, the conflict can sometimes be resolved / ignored
# (e.g. <prefix>/LICENSE or <site-packages>/<namespace>/__init__.py conflicts can be
# ignored).
self.file_conflicts = []
self.file_conflicts: List[MergeConflict] = []
# When we have to create a dir where a file is, or a file
# where a dir is, we have fatal errors, listed here.
self.fatal_conflicts = []
# When we have to create a dir where a file is, or a file where a dir is, we have fatal
# errors, listed here.
self.fatal_conflicts: List[MergeConflict] = []
# What directories we have to make; this is an ordered set,
# so that we have a fast lookup and can run mkdir in order.
self.directories = OrderedDict()
# What directories we have to make; this is an ordered dict, so that we have a fast lookup
# and can run mkdir in order.
self.directories: Dict[str, Tuple[str, str]] = {}
# Files to link. Maps dst_rel to (src_root, src_rel)
self.files = OrderedDict()
self.files: Dict[str, Tuple[str, str]] = {}
def before_visit_dir(self, root, rel_path, depth):
def before_visit_dir(self, root: str, rel_path: str, depth: int) -> bool:
"""
Register a directory if dst / rel_path is not blocked by a file or ignored.
"""
@ -104,7 +102,7 @@ def before_visit_dir(self, root, rel_path, depth):
self.directories[proj_rel_path] = (root, rel_path)
return True
def before_visit_symlinked_dir(self, root, rel_path, depth):
def before_visit_symlinked_dir(self, root: str, rel_path: str, depth: int) -> bool:
"""
Replace symlinked dirs with actual directories when possible in low depths,
otherwise handle it as a file (i.e. we link to the symlink).
@ -136,7 +134,7 @@ def before_visit_symlinked_dir(self, root, rel_path, depth):
self.visit_file(root, rel_path, depth)
return False
def visit_file(self, root, rel_path, depth):
def visit_file(self, root: str, rel_path: str, depth: int) -> None:
proj_rel_path = os.path.join(self.projection, rel_path)
if self.ignore(rel_path):
@ -165,11 +163,11 @@ def visit_file(self, root, rel_path, depth):
# Otherwise register this file to be linked.
self.files[proj_rel_path] = (root, rel_path)
def visit_symlinked_file(self, root, rel_path, depth):
def visit_symlinked_file(self, root: str, rel_path: str, depth: int) -> None:
# Treat symlinked files as ordinary files (without "dereferencing")
self.visit_file(root, rel_path, depth)
def set_projection(self, projection):
def set_projection(self, projection: str) -> None:
self.projection = os.path.normpath(projection)
# Todo, is this how to check in general for empty projection?
@ -197,24 +195,19 @@ def set_projection(self, projection):
class DestinationMergeVisitor(BaseDirectoryVisitor):
"""DestinatinoMergeVisitor takes a SourceMergeVisitor
and:
"""DestinatinoMergeVisitor takes a SourceMergeVisitor and:
a. registers additional conflicts when merging
to the destination prefix
b. removes redundant mkdir operations when
directories already exist in the destination
prefix.
a. registers additional conflicts when merging to the destination prefix
b. removes redundant mkdir operations when directories already exist in the destination prefix.
This also makes sure that symlinked directories
in the target prefix will never be merged with
This also makes sure that symlinked directories in the target prefix will never be merged with
directories in the sources directories.
"""
def __init__(self, source_merge_visitor):
def __init__(self, source_merge_visitor: SourceMergeVisitor):
self.src = source_merge_visitor
def before_visit_dir(self, root, rel_path, depth):
def before_visit_dir(self, root: str, rel_path: str, depth: int) -> bool:
# If destination dir is a file in a src dir, add a conflict,
# and don't traverse deeper
if rel_path in self.src.files:
@ -236,7 +229,7 @@ def before_visit_dir(self, root, rel_path, depth):
# don't descend into it.
return False
def before_visit_symlinked_dir(self, root, rel_path, depth):
def before_visit_symlinked_dir(self, root: str, rel_path: str, depth: int) -> bool:
"""
Symlinked directories in the destination prefix should
be seen as files; we should not accidentally merge
@ -262,7 +255,7 @@ def before_visit_symlinked_dir(self, root, rel_path, depth):
# Never descend into symlinked target dirs.
return False
def visit_file(self, root, rel_path, depth):
def visit_file(self, root: str, rel_path: str, depth: int) -> None:
# Can't merge a file if target already exists
if rel_path in self.src.directories:
src_a_root, src_a_relpath = self.src.directories[rel_path]
@ -280,7 +273,7 @@ def visit_file(self, root, rel_path, depth):
)
)
def visit_symlinked_file(self, root, rel_path, depth):
def visit_symlinked_file(self, root: str, rel_path: str, depth: int) -> None:
# Treat symlinked files as ordinary files (without "dereferencing")
self.visit_file(root, rel_path, depth)

View File

@ -32,6 +32,7 @@
from llnl.util.tty.color import colorize
import spack.config
import spack.paths
import spack.projections
import spack.relocate
import spack.schema.projections
@ -91,7 +92,7 @@ def view_copy(src: str, dst: str, view, spec: Optional[spack.spec.Spec] = None):
prefix_to_projection[spack.store.STORE.layout.root] = view._root
# This is vestigial code for the *old* location of sbang.
prefix_to_projection["#!/bin/bash {0}/bin/sbang".format(spack.paths.spack_root)] = (
prefix_to_projection[f"#!/bin/bash {spack.paths.spack_root}/bin/sbang"] = (
sbang.sbang_shebang_line()
)
@ -100,7 +101,7 @@ def view_copy(src: str, dst: str, view, spec: Optional[spack.spec.Spec] = None):
try:
os.chown(dst, src_stat.st_uid, src_stat.st_gid)
except OSError:
tty.debug("Can't change the permissions for %s" % dst)
tty.debug(f"Can't change the permissions for {dst}")
def view_func_parser(parsed_name):
@ -112,7 +113,7 @@ def view_func_parser(parsed_name):
elif parsed_name in ("add", "symlink", "soft"):
return view_symlink
else:
raise ValueError("invalid link type for view: '%s'" % parsed_name)
raise ValueError(f"invalid link type for view: '{parsed_name}'")
def inverse_view_func_parser(view_type):
@ -270,9 +271,10 @@ def __init__(self, root, layout, **kwargs):
# Ensure projections are the same from each source
# Read projections file from view
if self.projections != self.read_projections():
msg = "View at %s has projections file" % self._root
msg += " which does not match projections passed manually."
raise ConflictingProjectionsError(msg)
raise ConflictingProjectionsError(
f"View at {self._root} has projections file"
" which does not match projections passed manually."
)
self._croot = colorize_root(self._root) + " "
@ -313,11 +315,11 @@ def add_specs(self, *specs, **kwargs):
def add_standalone(self, spec):
if spec.external:
tty.warn(self._croot + "Skipping external package: %s" % colorize_spec(spec))
tty.warn(f"{self._croot}Skipping external package: {colorize_spec(spec)}")
return True
if self.check_added(spec):
tty.warn(self._croot + "Skipping already linked package: %s" % colorize_spec(spec))
tty.warn(f"{self._croot}Skipping already linked package: {colorize_spec(spec)}")
return True
self.merge(spec)
@ -325,7 +327,7 @@ def add_standalone(self, spec):
self.link_meta_folder(spec)
if self.verbose:
tty.info(self._croot + "Linked package: %s" % colorize_spec(spec))
tty.info(f"{self._croot}Linked package: {colorize_spec(spec)}")
return True
def merge(self, spec, ignore=None):
@ -393,7 +395,7 @@ def needs_file(spec, file):
for file in files:
if not os.path.lexists(file):
tty.warn("Tried to remove %s which does not exist" % file)
tty.warn(f"Tried to remove {file} which does not exist")
continue
# remove if file is not owned by any other package in the view
@ -404,7 +406,7 @@ def needs_file(spec, file):
# we are currently removing, as we remove files before unlinking the
# metadata directory.
if len([s for s in specs if needs_file(s, file)]) <= 1:
tty.debug("Removing file " + file)
tty.debug(f"Removing file {file}")
os.remove(file)
def check_added(self, spec):
@ -477,14 +479,14 @@ def remove_standalone(self, spec):
Remove (unlink) a standalone package from this view.
"""
if not self.check_added(spec):
tty.warn(self._croot + "Skipping package not linked in view: %s" % spec.name)
tty.warn(f"{self._croot}Skipping package not linked in view: {spec.name}")
return
self.unmerge(spec)
self.unlink_meta_folder(spec)
if self.verbose:
tty.info(self._croot + "Removed package: %s" % colorize_spec(spec))
tty.info(f"{self._croot}Removed package: {colorize_spec(spec)}")
def get_projection_for_spec(self, spec):
"""
@ -558,9 +560,9 @@ def print_conflict(self, spec_active, spec_specified, level="error"):
linked = tty.color.colorize(" (@gLinked@.)", color=color)
specified = tty.color.colorize("(@rSpecified@.)", color=color)
cprint(
self._croot + "Package conflict detected:\n"
"%s %s\n" % (linked, colorize_spec(spec_active))
+ "%s %s" % (specified, colorize_spec(spec_specified))
f"{self._croot}Package conflict detected:\n"
f"{linked} {colorize_spec(spec_active)}\n"
f"{specified} {colorize_spec(spec_specified)}"
)
def print_status(self, *specs, **kwargs):
@ -572,14 +574,14 @@ def print_status(self, *specs, **kwargs):
for s, v in zip(specs, in_view):
if not v:
tty.error(self._croot + "Package not linked: %s" % s.name)
tty.error(f"{self._croot}Package not linked: {s.name}")
elif s != v:
self.print_conflict(v, s, level="warn")
in_view = list(filter(None, in_view))
if len(specs) > 0:
tty.msg("Packages linked in %s:" % self._croot[:-1])
tty.msg(f"Packages linked in {self._croot[:-1]}:")
# Make a dict with specs keyed by architecture and compiler.
index = index_by(specs, ("architecture", "compiler"))
@ -589,20 +591,19 @@ def print_status(self, *specs, **kwargs):
if i > 0:
print()
header = "%s{%s} / %s{%s}" % (
spack.spec.ARCHITECTURE_COLOR,
architecture,
spack.spec.COMPILER_COLOR,
compiler,
header = (
f"{spack.spec.ARCHITECTURE_COLOR}{{{architecture}}} "
f"/ {spack.spec.COMPILER_COLOR}{{{compiler}}}"
)
tty.hline(colorize(header), char="-")
specs = index[(architecture, compiler)]
specs.sort()
format_string = "{name}{@version}"
format_string += "{%compiler}{compiler_flags}{variants}"
abbreviated = [s.cformat(format_string) for s in specs]
abbreviated = [
s.cformat("{name}{@version}{%compiler}{compiler_flags}{variants}")
for s in specs
]
# Print one spec per line along with prefix path
width = max(len(s) for s in abbreviated)
@ -634,22 +635,19 @@ def unlink_meta_folder(self, spec):
class SimpleFilesystemView(FilesystemView):
"""A simple and partial implementation of FilesystemView focused on
performance and immutable views, where specs cannot be removed after they
were added."""
"""A simple and partial implementation of FilesystemView focused on performance and immutable
views, where specs cannot be removed after they were added."""
def __init__(self, root, layout, **kwargs):
super().__init__(root, layout, **kwargs)
def _sanity_check_view_projection(self, specs):
"""A very common issue is that we end up with two specs of the same
package, that project to the same prefix. We want to catch that as
early as possible and give a sensible error to the user. Here we use
the metadata dir (.spack) projection as a quick test to see whether
two specs in the view are going to clash. The metadata dir is used
because it's always added by Spack with identical files, so a
guaranteed clash that's easily verified."""
seen = dict()
"""A very common issue is that we end up with two specs of the same package, that project
to the same prefix. We want to catch that as early as possible and give a sensible error to
the user. Here we use the metadata dir (.spack) projection as a quick test to see whether
two specs in the view are going to clash. The metadata dir is used because it's always
added by Spack with identical files, so a guaranteed clash that's easily verified."""
seen = {}
for current_spec in specs:
metadata_dir = self.relative_metadata_dir_for_spec(current_spec)
conflicting_spec = seen.get(metadata_dir)
@ -695,13 +693,11 @@ def skip_list(file):
# Inform about file-file conflicts.
if visitor.file_conflicts:
if self.ignore_conflicts:
tty.debug("{0} file conflicts".format(len(visitor.file_conflicts)))
tty.debug(f"{len(visitor.file_conflicts)} file conflicts")
else:
raise MergeConflictSummary(visitor.file_conflicts)
tty.debug(
"Creating {0} dirs and {1} links".format(len(visitor.directories), len(visitor.files))
)
tty.debug(f"Creating {len(visitor.directories)} dirs and {len(visitor.files)} links")
# Make the directory structure
for dst in visitor.directories:

View File

@ -13,8 +13,7 @@
import pytest
import llnl.util.filesystem as fs
import llnl.util.symlink
from llnl.util.symlink import SymlinkError, _windows_can_symlink, islink, symlink
from llnl.util.symlink import islink, symlink
import spack.paths
@ -754,93 +753,6 @@ def test_is_nonsymlink_exe_with_shebang(tmpdir):
assert not fs.is_nonsymlink_exe_with_shebang("symlink_to_executable_script")
@pytest.mark.skipif(sys.platform == "win32", reason="Unix-only test.")
def test_lexists_islink_isdir(tmpdir):
root = str(tmpdir)
# Create a directory and a file, an a bunch of symlinks.
dir = os.path.join(root, "dir")
file = os.path.join(root, "file")
nonexistent = os.path.join(root, "does_not_exist")
symlink_to_dir = os.path.join(root, "symlink_to_dir")
symlink_to_file = os.path.join(root, "symlink_to_file")
dangling_symlink = os.path.join(root, "dangling_symlink")
symlink_to_dangling_symlink = os.path.join(root, "symlink_to_dangling_symlink")
symlink_to_symlink_to_dir = os.path.join(root, "symlink_to_symlink_to_dir")
symlink_to_symlink_to_file = os.path.join(root, "symlink_to_symlink_to_file")
os.mkdir(dir)
with open(file, "wb") as f:
f.write(b"file")
symlink("dir", symlink_to_dir)
symlink("file", symlink_to_file)
symlink("does_not_exist", dangling_symlink)
symlink("dangling_symlink", symlink_to_dangling_symlink)
symlink("symlink_to_dir", symlink_to_symlink_to_dir)
symlink("symlink_to_file", symlink_to_symlink_to_file)
assert fs.lexists_islink_isdir(dir) == (True, False, True)
assert fs.lexists_islink_isdir(file) == (True, False, False)
assert fs.lexists_islink_isdir(nonexistent) == (False, False, False)
assert fs.lexists_islink_isdir(symlink_to_dir) == (True, True, True)
assert fs.lexists_islink_isdir(symlink_to_file) == (True, True, False)
assert fs.lexists_islink_isdir(symlink_to_dangling_symlink) == (True, True, False)
assert fs.lexists_islink_isdir(symlink_to_symlink_to_dir) == (True, True, True)
assert fs.lexists_islink_isdir(symlink_to_symlink_to_file) == (True, True, False)
@pytest.mark.skipif(sys.platform != "win32", reason="For Windows Only")
@pytest.mark.parametrize("win_can_symlink", [True, False])
def test_lexists_islink_isdir_windows(tmpdir, monkeypatch, win_can_symlink):
"""Run on windows without elevated privileges to test junctions and hard links which have
different results from the lexists_islink_isdir method.
"""
if win_can_symlink and not _windows_can_symlink():
pytest.skip("Cannot test dev mode behavior without dev mode enabled.")
with tmpdir.as_cwd():
monkeypatch.setattr(llnl.util.symlink, "_windows_can_symlink", lambda: win_can_symlink)
dir = str(tmpdir.join("dir"))
file = str(tmpdir.join("file"))
nonexistent = str(tmpdir.join("does_not_exist"))
symlink_to_dir = str(tmpdir.join("symlink_to_dir"))
symlink_to_file = str(tmpdir.join("symlink_to_file"))
dangling_symlink = str(tmpdir.join("dangling_symlink"))
symlink_to_dangling_symlink = str(tmpdir.join("symlink_to_dangling_symlink"))
symlink_to_symlink_to_dir = str(tmpdir.join("symlink_to_symlink_to_dir"))
symlink_to_symlink_to_file = str(tmpdir.join("symlink_to_symlink_to_file"))
os.mkdir(dir)
assert fs.lexists_islink_isdir(dir) == (True, False, True)
symlink("dir", symlink_to_dir)
assert fs.lexists_islink_isdir(dir) == (True, False, True)
assert fs.lexists_islink_isdir(symlink_to_dir) == (True, True, True)
with open(file, "wb") as f:
f.write(b"file")
assert fs.lexists_islink_isdir(file) == (True, False, False)
symlink("file", symlink_to_file)
if win_can_symlink:
assert fs.lexists_islink_isdir(file) == (True, False, False)
else:
assert fs.lexists_islink_isdir(file) == (True, True, False)
assert fs.lexists_islink_isdir(symlink_to_file) == (True, True, False)
with pytest.raises(SymlinkError):
symlink("does_not_exist", dangling_symlink)
symlink("dangling_symlink", symlink_to_dangling_symlink)
symlink("symlink_to_dir", symlink_to_symlink_to_dir)
symlink("symlink_to_file", symlink_to_symlink_to_file)
assert fs.lexists_islink_isdir(nonexistent) == (False, False, False)
assert fs.lexists_islink_isdir(symlink_to_dangling_symlink) == (False, False, False)
assert fs.lexists_islink_isdir(symlink_to_symlink_to_dir) == (True, True, True)
assert fs.lexists_islink_isdir(symlink_to_symlink_to_file) == (True, True, False)
class RegisterVisitor(fs.BaseDirectoryVisitor):
"""A directory visitor that keeps track of all visited paths"""