Windows symlinking support (#38599)
This reapplies 66f7540, which adds supports for hardlinks/junctions on
Windows systems where developer mode is not enabled.
The commit was reverted on account of multiple issues:
* Checks added to prevent dangling symlinks were interfering with
  existing CI builds on Linux (i.e. builds that otherwise succeed were
  failing for creating dangling symlinks).
* The logic also updated symlinking to perform redirection of relative
  paths, which lead to malformed symlinks.
This commit fixes these issues.
			
			
This commit is contained in:
		@@ -18,11 +18,13 @@
 | 
			
		||||
import sys
 | 
			
		||||
import tempfile
 | 
			
		||||
from contextlib import contextmanager
 | 
			
		||||
from itertools import accumulate
 | 
			
		||||
from typing import Callable, Iterable, List, Match, Optional, Tuple, Union
 | 
			
		||||
 | 
			
		||||
import llnl.util.symlink
 | 
			
		||||
from llnl.util import tty
 | 
			
		||||
from llnl.util.lang import dedupe, memoized
 | 
			
		||||
from llnl.util.symlink import islink, symlink
 | 
			
		||||
from llnl.util.symlink import islink, readlink, resolve_link_target_relative_to_the_link, symlink
 | 
			
		||||
 | 
			
		||||
from spack.util.executable import Executable, which
 | 
			
		||||
from spack.util.path import path_to_os_path, system_path_filter
 | 
			
		||||
@@ -101,7 +103,7 @@ def _nop(args, ns=None, follow_symlinks=None):
 | 
			
		||||
            pass
 | 
			
		||||
 | 
			
		||||
        # follow symlinks (aka don't not follow symlinks)
 | 
			
		||||
        follow = follow_symlinks or not (os.path.islink(src) and os.path.islink(dst))
 | 
			
		||||
        follow = follow_symlinks or not (islink(src) and islink(dst))
 | 
			
		||||
        if follow:
 | 
			
		||||
            # use the real function if it exists
 | 
			
		||||
            def lookup(name):
 | 
			
		||||
@@ -169,7 +171,7 @@ def rename(src, dst):
 | 
			
		||||
    if sys.platform == "win32":
 | 
			
		||||
        # Windows path existence checks will sometimes fail on junctions/links/symlinks
 | 
			
		||||
        # so check for that case
 | 
			
		||||
        if os.path.exists(dst) or os.path.islink(dst):
 | 
			
		||||
        if os.path.exists(dst) or islink(dst):
 | 
			
		||||
            os.remove(dst)
 | 
			
		||||
    os.rename(src, dst)
 | 
			
		||||
 | 
			
		||||
@@ -566,7 +568,7 @@ def set_install_permissions(path):
 | 
			
		||||
    # If this points to a file maintained in a Spack prefix, it is assumed that
 | 
			
		||||
    # this function will be invoked on the target. If the file is outside a
 | 
			
		||||
    # Spack-maintained prefix, the permissions should not be modified.
 | 
			
		||||
    if os.path.islink(path):
 | 
			
		||||
    if islink(path):
 | 
			
		||||
        return
 | 
			
		||||
    if os.path.isdir(path):
 | 
			
		||||
        os.chmod(path, 0o755)
 | 
			
		||||
@@ -635,7 +637,7 @@ def chmod_x(entry, perms):
 | 
			
		||||
@system_path_filter
 | 
			
		||||
def copy_mode(src, dest):
 | 
			
		||||
    """Set the mode of dest to that of src unless it is a link."""
 | 
			
		||||
    if os.path.islink(dest):
 | 
			
		||||
    if islink(dest):
 | 
			
		||||
        return
 | 
			
		||||
    src_mode = os.stat(src).st_mode
 | 
			
		||||
    dest_mode = os.stat(dest).st_mode
 | 
			
		||||
@@ -721,26 +723,12 @@ def install(src, dest):
 | 
			
		||||
    copy(src, dest, _permissions=True)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@system_path_filter
 | 
			
		||||
def resolve_link_target_relative_to_the_link(link):
 | 
			
		||||
    """
 | 
			
		||||
    os.path.isdir uses os.path.exists, which for links will check
 | 
			
		||||
    the existence of the link target. If the link target is relative to
 | 
			
		||||
    the link, we need to construct a pathname that is valid from
 | 
			
		||||
    our cwd (which may not be the same as the link's directory)
 | 
			
		||||
    """
 | 
			
		||||
    target = os.readlink(link)
 | 
			
		||||
    if os.path.isabs(target):
 | 
			
		||||
        return target
 | 
			
		||||
    link_dir = os.path.dirname(os.path.abspath(link))
 | 
			
		||||
    return os.path.join(link_dir, target)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@system_path_filter
 | 
			
		||||
def copy_tree(
 | 
			
		||||
    src: str,
 | 
			
		||||
    dest: str,
 | 
			
		||||
    symlinks: bool = True,
 | 
			
		||||
    allow_broken_symlinks: bool = sys.platform != "win32",
 | 
			
		||||
    ignore: Optional[Callable[[str], bool]] = None,
 | 
			
		||||
    _permissions: bool = False,
 | 
			
		||||
):
 | 
			
		||||
@@ -763,6 +751,8 @@ def copy_tree(
 | 
			
		||||
        src (str): the directory to copy
 | 
			
		||||
        dest (str): the destination directory
 | 
			
		||||
        symlinks (bool): whether or not to preserve symlinks
 | 
			
		||||
        allow_broken_symlinks (bool): whether or not to allow broken (dangling) symlinks,
 | 
			
		||||
            On Windows, setting this to True will raise an exception. Defaults to true on unix.
 | 
			
		||||
        ignore (typing.Callable): function indicating which files to ignore
 | 
			
		||||
        _permissions (bool): for internal use only
 | 
			
		||||
 | 
			
		||||
@@ -770,6 +760,8 @@ def copy_tree(
 | 
			
		||||
        IOError: if *src* does not match any files or directories
 | 
			
		||||
        ValueError: if *src* is a parent directory of *dest*
 | 
			
		||||
    """
 | 
			
		||||
    if allow_broken_symlinks and sys.platform == "win32":
 | 
			
		||||
        raise llnl.util.symlink.SymlinkError("Cannot allow broken symlinks on Windows!")
 | 
			
		||||
    if _permissions:
 | 
			
		||||
        tty.debug("Installing {0} to {1}".format(src, dest))
 | 
			
		||||
    else:
 | 
			
		||||
@@ -783,6 +775,11 @@ def copy_tree(
 | 
			
		||||
    if not files:
 | 
			
		||||
        raise IOError("No such file or directory: '{0}'".format(src))
 | 
			
		||||
 | 
			
		||||
    # For Windows hard-links and junctions, the source path must exist to make a symlink. Add
 | 
			
		||||
    # all symlinks to this list while traversing the tree, then when finished, make all
 | 
			
		||||
    # symlinks at the end.
 | 
			
		||||
    links = []
 | 
			
		||||
 | 
			
		||||
    for src in files:
 | 
			
		||||
        abs_src = os.path.abspath(src)
 | 
			
		||||
        if not abs_src.endswith(os.path.sep):
 | 
			
		||||
@@ -805,7 +802,7 @@ def copy_tree(
 | 
			
		||||
            ignore=ignore,
 | 
			
		||||
            follow_nonexisting=True,
 | 
			
		||||
        ):
 | 
			
		||||
            if os.path.islink(s):
 | 
			
		||||
            if islink(s):
 | 
			
		||||
                link_target = resolve_link_target_relative_to_the_link(s)
 | 
			
		||||
                if symlinks:
 | 
			
		||||
                    target = os.readlink(s)
 | 
			
		||||
@@ -819,7 +816,9 @@ def escaped_path(path):
 | 
			
		||||
                            tty.debug("Redirecting link {0} to {1}".format(target, new_target))
 | 
			
		||||
                            target = new_target
 | 
			
		||||
 | 
			
		||||
                    symlink(target, d)
 | 
			
		||||
                    links.append((target, d, s))
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
                elif os.path.isdir(link_target):
 | 
			
		||||
                    mkdirp(d)
 | 
			
		||||
                else:
 | 
			
		||||
@@ -834,9 +833,17 @@ def escaped_path(path):
 | 
			
		||||
                set_install_permissions(d)
 | 
			
		||||
                copy_mode(s, d)
 | 
			
		||||
 | 
			
		||||
    for target, d, s in links:
 | 
			
		||||
        symlink(target, d, allow_broken_symlinks=allow_broken_symlinks)
 | 
			
		||||
        if _permissions:
 | 
			
		||||
            set_install_permissions(d)
 | 
			
		||||
            copy_mode(s, d)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@system_path_filter
 | 
			
		||||
def install_tree(src, dest, symlinks=True, ignore=None):
 | 
			
		||||
def install_tree(
 | 
			
		||||
    src, dest, symlinks=True, ignore=None, allow_broken_symlinks=sys.platform != "win32"
 | 
			
		||||
):
 | 
			
		||||
    """Recursively install an entire directory tree rooted at *src*.
 | 
			
		||||
 | 
			
		||||
    Same as :py:func:`copy_tree` with the addition of setting proper
 | 
			
		||||
@@ -847,12 +854,21 @@ def install_tree(src, dest, symlinks=True, ignore=None):
 | 
			
		||||
        dest (str): the destination directory
 | 
			
		||||
        symlinks (bool): whether or not to preserve symlinks
 | 
			
		||||
        ignore (typing.Callable): function indicating which files to ignore
 | 
			
		||||
        allow_broken_symlinks (bool): whether or not to allow broken (dangling) symlinks,
 | 
			
		||||
            On Windows, setting this to True will raise an exception.
 | 
			
		||||
 | 
			
		||||
    Raises:
 | 
			
		||||
        IOError: if *src* does not match any files or directories
 | 
			
		||||
        ValueError: if *src* is a parent directory of *dest*
 | 
			
		||||
    """
 | 
			
		||||
    copy_tree(src, dest, symlinks=symlinks, ignore=ignore, _permissions=True)
 | 
			
		||||
    copy_tree(
 | 
			
		||||
        src,
 | 
			
		||||
        dest,
 | 
			
		||||
        symlinks=symlinks,
 | 
			
		||||
        allow_broken_symlinks=allow_broken_symlinks,
 | 
			
		||||
        ignore=ignore,
 | 
			
		||||
        _permissions=True,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@system_path_filter
 | 
			
		||||
@@ -1256,7 +1272,12 @@ def traverse_tree(
 | 
			
		||||
    Keyword Arguments:
 | 
			
		||||
        order (str): Whether to do pre- or post-order traversal. Accepted
 | 
			
		||||
            values are 'pre' and 'post'
 | 
			
		||||
        ignore (typing.Callable): function indicating which files to ignore
 | 
			
		||||
        ignore (typing.Callable): function indicating which files to ignore. This will also
 | 
			
		||||
            ignore symlinks if they point to an ignored file (regardless of whether the symlink
 | 
			
		||||
            is explicitly ignored); note this only supports one layer of indirection (i.e. if
 | 
			
		||||
            you have x -> y -> z, and z is ignored but x/y are not, then y would be ignored
 | 
			
		||||
            but not x). To avoid this, make sure the ignore function also ignores the symlink
 | 
			
		||||
            paths too.
 | 
			
		||||
        follow_nonexisting (bool): Whether to descend into directories in
 | 
			
		||||
            ``src`` that do not exit in ``dest``. Default is True
 | 
			
		||||
        follow_links (bool): Whether to descend into symlinks in ``src``
 | 
			
		||||
@@ -1283,11 +1304,24 @@ def traverse_tree(
 | 
			
		||||
        dest_child = os.path.join(dest_path, f)
 | 
			
		||||
        rel_child = os.path.join(rel_path, f)
 | 
			
		||||
 | 
			
		||||
        # If the source path is a link and the link's source is ignored, then ignore the link too,
 | 
			
		||||
        # but only do this if the ignore is defined.
 | 
			
		||||
        if ignore is not None:
 | 
			
		||||
            if islink(source_child) and not follow_links:
 | 
			
		||||
                target = readlink(source_child)
 | 
			
		||||
                all_parents = accumulate(target.split(os.sep), lambda x, y: os.path.join(x, y))
 | 
			
		||||
                if any(map(ignore, all_parents)):
 | 
			
		||||
                    tty.warn(
 | 
			
		||||
                        f"Skipping {source_path} because the source or a part of the source's "
 | 
			
		||||
                        f"path is included in the ignores."
 | 
			
		||||
                    )
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
        # Treat as a directory
 | 
			
		||||
        # TODO: for symlinks, os.path.isdir looks for the link target. If the
 | 
			
		||||
        # target is relative to the link, then that may not resolve properly
 | 
			
		||||
        # relative to our cwd - see resolve_link_target_relative_to_the_link
 | 
			
		||||
        if os.path.isdir(source_child) and (follow_links or not os.path.islink(source_child)):
 | 
			
		||||
        if os.path.isdir(source_child) and (follow_links or not islink(source_child)):
 | 
			
		||||
            # When follow_nonexisting isn't set, don't descend into dirs
 | 
			
		||||
            # in source that do not exist in dest
 | 
			
		||||
            if follow_nonexisting or os.path.exists(dest_child):
 | 
			
		||||
@@ -1313,7 +1347,11 @@ def traverse_tree(
 | 
			
		||||
 | 
			
		||||
def lexists_islink_isdir(path):
 | 
			
		||||
    """Computes the tuple (lexists(path), islink(path), isdir(path)) in a minimal
 | 
			
		||||
    number of stat calls."""
 | 
			
		||||
    number of stat calls on unix. Use os.path and symlink.islink methods for windows."""
 | 
			
		||||
    if sys.platform == "win32":
 | 
			
		||||
        if not os.path.lexists(path):
 | 
			
		||||
            return False, False, False
 | 
			
		||||
        return os.path.lexists(path), islink(path), os.path.isdir(path)
 | 
			
		||||
    # First try to lstat, so we know if it's a link or not.
 | 
			
		||||
    try:
 | 
			
		||||
        lst = os.lstat(path)
 | 
			
		||||
@@ -1528,7 +1566,7 @@ def remove_if_dead_link(path):
 | 
			
		||||
    Parameters:
 | 
			
		||||
        path (str): The potential dead link
 | 
			
		||||
    """
 | 
			
		||||
    if os.path.islink(path) and not os.path.exists(path):
 | 
			
		||||
    if islink(path) and not os.path.exists(path):
 | 
			
		||||
        os.unlink(path)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@@ -1587,7 +1625,7 @@ def remove_linked_tree(path):
 | 
			
		||||
        kwargs["onerror"] = readonly_file_handler(ignore_errors=True)
 | 
			
		||||
 | 
			
		||||
    if os.path.exists(path):
 | 
			
		||||
        if os.path.islink(path):
 | 
			
		||||
        if islink(path):
 | 
			
		||||
            shutil.rmtree(os.path.realpath(path), **kwargs)
 | 
			
		||||
            os.unlink(path)
 | 
			
		||||
        else:
 | 
			
		||||
@@ -2693,7 +2731,7 @@ def remove_directory_contents(dir):
 | 
			
		||||
    """Remove all contents of a directory."""
 | 
			
		||||
    if os.path.exists(dir):
 | 
			
		||||
        for entry in [os.path.join(dir, entry) for entry in os.listdir(dir)]:
 | 
			
		||||
            if os.path.isfile(entry) or os.path.islink(entry):
 | 
			
		||||
            if os.path.isfile(entry) or islink(entry):
 | 
			
		||||
                os.unlink(entry)
 | 
			
		||||
            else:
 | 
			
		||||
                shutil.rmtree(entry)
 | 
			
		||||
 
 | 
			
		||||
@@ -2,77 +2,188 @@
 | 
			
		||||
# Spack Project Developers. See the top-level COPYRIGHT file for details.
 | 
			
		||||
#
 | 
			
		||||
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
 | 
			
		||||
import errno
 | 
			
		||||
import os
 | 
			
		||||
import re
 | 
			
		||||
import shutil
 | 
			
		||||
import subprocess
 | 
			
		||||
import sys
 | 
			
		||||
import tempfile
 | 
			
		||||
from os.path import exists, join
 | 
			
		||||
 | 
			
		||||
from llnl.util import lang
 | 
			
		||||
from llnl.util import lang, tty
 | 
			
		||||
 | 
			
		||||
from spack.error import SpackError
 | 
			
		||||
from spack.util.path import system_path_filter
 | 
			
		||||
 | 
			
		||||
if sys.platform == "win32":
 | 
			
		||||
    from win32file import CreateHardLink
 | 
			
		||||
 | 
			
		||||
is_windows = sys.platform == "win32"
 | 
			
		||||
 | 
			
		||||
def symlink(real_path, link_path):
 | 
			
		||||
    """
 | 
			
		||||
    Create a symbolic link.
 | 
			
		||||
 | 
			
		||||
    On Windows, use junctions if os.symlink fails.
 | 
			
		||||
def symlink(source_path: str, link_path: str, allow_broken_symlinks: bool = not is_windows):
 | 
			
		||||
    """
 | 
			
		||||
    if sys.platform != "win32":
 | 
			
		||||
        os.symlink(real_path, link_path)
 | 
			
		||||
    elif _win32_can_symlink():
 | 
			
		||||
        # Windows requires target_is_directory=True when the target is a dir.
 | 
			
		||||
        os.symlink(real_path, link_path, target_is_directory=os.path.isdir(real_path))
 | 
			
		||||
    else:
 | 
			
		||||
        try:
 | 
			
		||||
            # Try to use junctions
 | 
			
		||||
            _win32_junction(real_path, link_path)
 | 
			
		||||
        except OSError as e:
 | 
			
		||||
            if e.errno == errno.EEXIST:
 | 
			
		||||
                # EEXIST error indicates that file we're trying to "link"
 | 
			
		||||
                # is already present, don't bother trying to copy which will also fail
 | 
			
		||||
                # just raise
 | 
			
		||||
                raise
 | 
			
		||||
    Create a link.
 | 
			
		||||
 | 
			
		||||
    On non-Windows and Windows with System Administrator
 | 
			
		||||
    privleges this will be a normal symbolic link via
 | 
			
		||||
    os.symlink.
 | 
			
		||||
 | 
			
		||||
    On Windows without privledges the link will be a
 | 
			
		||||
    junction for a directory and a hardlink for a file.
 | 
			
		||||
    On Windows the various link types are:
 | 
			
		||||
 | 
			
		||||
    Symbolic Link: A link to a file or directory on the
 | 
			
		||||
    same or different volume (drive letter) or even to
 | 
			
		||||
    a remote file or directory (using UNC in its path).
 | 
			
		||||
    Need System Administrator privileges to make these.
 | 
			
		||||
 | 
			
		||||
    Hard Link: A link to a file on the same volume (drive
 | 
			
		||||
    letter) only. Every file (file's data) has at least 1
 | 
			
		||||
    hard link (file's name). But when this method creates
 | 
			
		||||
    a new hard link there will be 2. Deleting all hard
 | 
			
		||||
    links effectively deletes the file. Don't need System
 | 
			
		||||
    Administrator privileges.
 | 
			
		||||
 | 
			
		||||
    Junction: A link to a directory on the same or different
 | 
			
		||||
    volume (drive letter) but not to a remote directory. Don't
 | 
			
		||||
    need System Administrator privileges.
 | 
			
		||||
 | 
			
		||||
    Parameters:
 | 
			
		||||
        source_path (str): The real file or directory that the link points to.
 | 
			
		||||
            Must be absolute OR relative to the link.
 | 
			
		||||
        link_path (str): The path where the link will exist.
 | 
			
		||||
        allow_broken_symlinks (bool): On Linux or Mac, don't raise an exception if the source_path
 | 
			
		||||
            doesn't exist. This will still raise an exception on Windows.
 | 
			
		||||
    """
 | 
			
		||||
    source_path = os.path.normpath(source_path)
 | 
			
		||||
    win_source_path = source_path
 | 
			
		||||
    link_path = os.path.normpath(link_path)
 | 
			
		||||
 | 
			
		||||
    # Never allow broken links on Windows.
 | 
			
		||||
    if sys.platform == "win32" and allow_broken_symlinks:
 | 
			
		||||
        raise ValueError("allow_broken_symlinks parameter cannot be True on Windows.")
 | 
			
		||||
 | 
			
		||||
    if not allow_broken_symlinks:
 | 
			
		||||
        # Perform basic checks to make sure symlinking will succeed
 | 
			
		||||
        if os.path.lexists(link_path):
 | 
			
		||||
            raise SymlinkError(f"Link path ({link_path}) already exists. Cannot create link.")
 | 
			
		||||
 | 
			
		||||
        if not os.path.exists(source_path):
 | 
			
		||||
            if os.path.isabs(source_path) and not allow_broken_symlinks:
 | 
			
		||||
                # An absolute source path that does not exist will result in a broken link.
 | 
			
		||||
                raise SymlinkError(
 | 
			
		||||
                    f"Source path ({source_path}) is absolute but does not exist. Resulting "
 | 
			
		||||
                    f"link would be broken so not making link."
 | 
			
		||||
                )
 | 
			
		||||
            else:
 | 
			
		||||
                # If all else fails, fall back to copying files
 | 
			
		||||
                shutil.copyfile(real_path, link_path)
 | 
			
		||||
                # os.symlink can create a link when the given source path is relative to
 | 
			
		||||
                # the link path. Emulate this behavior and check to see if the source exists
 | 
			
		||||
                # relative to the link patg ahead of link creation to prevent broken
 | 
			
		||||
                # links from being made.
 | 
			
		||||
                link_parent_dir = os.path.dirname(link_path)
 | 
			
		||||
                relative_path = os.path.join(link_parent_dir, source_path)
 | 
			
		||||
                if os.path.exists(relative_path):
 | 
			
		||||
                    # In order to work on windows, the source path needs to be modified to be
 | 
			
		||||
                    # relative because hardlink/junction dont resolve relative paths the same
 | 
			
		||||
                    # way as os.symlink. This is ignored on other operating systems.
 | 
			
		||||
                    win_source_path = relative_path
 | 
			
		||||
                elif not allow_broken_symlinks:
 | 
			
		||||
                    raise SymlinkError(
 | 
			
		||||
                        f"The source path ({source_path}) is not relative to the link path "
 | 
			
		||||
                        f"({link_path}). Resulting link would be broken so not making link."
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
    # Create the symlink
 | 
			
		||||
    if sys.platform == "win32" and not _windows_can_symlink():
 | 
			
		||||
        _windows_create_link(win_source_path, link_path)
 | 
			
		||||
    else:
 | 
			
		||||
        os.symlink(source_path, link_path, target_is_directory=os.path.isdir(source_path))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def islink(path):
 | 
			
		||||
    return os.path.islink(path) or _win32_is_junction(path)
 | 
			
		||||
def islink(path: str) -> bool:
 | 
			
		||||
    """Override os.islink to give correct answer for spack logic.
 | 
			
		||||
 | 
			
		||||
    For Non-Windows: a link can be determined with the os.path.islink method.
 | 
			
		||||
    Windows-only methods will return false for other operating systems.
 | 
			
		||||
 | 
			
		||||
    For Windows: spack considers symlinks, hard links, and junctions to
 | 
			
		||||
    all be links, so if any of those are True, return True.
 | 
			
		||||
 | 
			
		||||
    Args:
 | 
			
		||||
        path (str): path to check if it is a link.
 | 
			
		||||
 | 
			
		||||
    Returns:
 | 
			
		||||
         bool - whether the path is any kind link or not.
 | 
			
		||||
    """
 | 
			
		||||
    return any([os.path.islink(path), _windows_is_junction(path), _windows_is_hardlink(path)])
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# '_win32' functions based on
 | 
			
		||||
# https://github.com/Erotemic/ubelt/blob/master/ubelt/util_links.py
 | 
			
		||||
def _win32_junction(path, link):
 | 
			
		||||
    # junctions require absolute paths
 | 
			
		||||
    if not os.path.isabs(link):
 | 
			
		||||
        link = os.path.abspath(link)
 | 
			
		||||
def _windows_is_hardlink(path: str) -> bool:
 | 
			
		||||
    """Determines if a path is a windows hard link. This is accomplished
 | 
			
		||||
    by looking at the number of links using os.stat. A non-hard-linked file
 | 
			
		||||
    will have a st_nlink value of 1, whereas a hard link will have a value
 | 
			
		||||
    larger than 1. Note that both the original and hard-linked file will
 | 
			
		||||
    return True because they share the same inode.
 | 
			
		||||
 | 
			
		||||
    # os.symlink will fail if link exists, emulate the behavior here
 | 
			
		||||
    if exists(link):
 | 
			
		||||
        raise OSError(errno.EEXIST, "File  exists: %s -> %s" % (link, path))
 | 
			
		||||
    Args:
 | 
			
		||||
        path (str): Windows path to check for a hard link
 | 
			
		||||
 | 
			
		||||
    if not os.path.isabs(path):
 | 
			
		||||
        parent = os.path.join(link, os.pardir)
 | 
			
		||||
        path = os.path.join(parent, path)
 | 
			
		||||
        path = os.path.abspath(path)
 | 
			
		||||
    Returns:
 | 
			
		||||
         bool - Whether the path is a hard link or not.
 | 
			
		||||
    """
 | 
			
		||||
    if sys.platform != "win32" or os.path.islink(path) or not os.path.exists(path):
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
    CreateHardLink(link, path)
 | 
			
		||||
    return os.stat(path).st_nlink > 1
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _windows_is_junction(path: str) -> bool:
 | 
			
		||||
    """Determines if a path is a windows junction. A junction can be
 | 
			
		||||
    determined using a bitwise AND operation between the file's
 | 
			
		||||
    attribute bitmask and the known junction bitmask (0x400).
 | 
			
		||||
 | 
			
		||||
    Args:
 | 
			
		||||
        path (str): A non-file path
 | 
			
		||||
 | 
			
		||||
    Returns:
 | 
			
		||||
        bool - whether the path is a junction or not.
 | 
			
		||||
    """
 | 
			
		||||
    if sys.platform != "win32" or os.path.islink(path) or os.path.isfile(path):
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
    import ctypes.wintypes
 | 
			
		||||
 | 
			
		||||
    get_file_attributes = ctypes.windll.kernel32.GetFileAttributesW  # type: ignore[attr-defined]
 | 
			
		||||
    get_file_attributes.argtypes = (ctypes.wintypes.LPWSTR,)
 | 
			
		||||
    get_file_attributes.restype = ctypes.wintypes.DWORD
 | 
			
		||||
 | 
			
		||||
    invalid_file_attributes = 0xFFFFFFFF
 | 
			
		||||
    reparse_point = 0x400
 | 
			
		||||
    file_attr = get_file_attributes(str(path))
 | 
			
		||||
 | 
			
		||||
    if file_attr == invalid_file_attributes:
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
    return file_attr & reparse_point > 0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@lang.memoized
 | 
			
		||||
def _win32_can_symlink():
 | 
			
		||||
def _windows_can_symlink() -> bool:
 | 
			
		||||
    """
 | 
			
		||||
    Determines if windows is able to make a symlink depending on
 | 
			
		||||
    the system configuration and the level of the user's permissions.
 | 
			
		||||
    """
 | 
			
		||||
    if sys.platform != "win32":
 | 
			
		||||
        tty.warn("windows_can_symlink method can't be used on non-Windows OS.")
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
    tempdir = tempfile.mkdtemp()
 | 
			
		||||
 | 
			
		||||
    dpath = join(tempdir, "dpath")
 | 
			
		||||
    fpath = join(tempdir, "fpath.txt")
 | 
			
		||||
    dpath = os.path.join(tempdir, "dpath")
 | 
			
		||||
    fpath = os.path.join(tempdir, "fpath.txt")
 | 
			
		||||
 | 
			
		||||
    dlink = join(tempdir, "dlink")
 | 
			
		||||
    flink = join(tempdir, "flink.txt")
 | 
			
		||||
    dlink = os.path.join(tempdir, "dlink")
 | 
			
		||||
    flink = os.path.join(tempdir, "flink.txt")
 | 
			
		||||
 | 
			
		||||
    import llnl.util.filesystem as fs
 | 
			
		||||
 | 
			
		||||
@@ -96,24 +207,136 @@ def _win32_can_symlink():
 | 
			
		||||
    return can_symlink_directories and can_symlink_files
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _win32_is_junction(path):
 | 
			
		||||
def _windows_create_link(source: str, link: str):
 | 
			
		||||
    """
 | 
			
		||||
    Determines if a path is a win32 junction
 | 
			
		||||
    Attempts to create a Hard Link or Junction as an alternative
 | 
			
		||||
    to a symbolic link. This is called when symbolic links cannot
 | 
			
		||||
    be created.
 | 
			
		||||
    """
 | 
			
		||||
    if os.path.islink(path):
 | 
			
		||||
        return False
 | 
			
		||||
    if sys.platform != "win32":
 | 
			
		||||
        raise SymlinkError("windows_create_link method can't be used on non-Windows OS.")
 | 
			
		||||
    elif os.path.isdir(source):
 | 
			
		||||
        _windows_create_junction(source=source, link=link)
 | 
			
		||||
    elif os.path.isfile(source):
 | 
			
		||||
        _windows_create_hard_link(path=source, link=link)
 | 
			
		||||
    else:
 | 
			
		||||
        raise SymlinkError(
 | 
			
		||||
            f"Cannot create link from {source}. It is neither a file nor a directory."
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    if sys.platform == "win32":
 | 
			
		||||
        import ctypes.wintypes
 | 
			
		||||
 | 
			
		||||
        GetFileAttributes = ctypes.windll.kernel32.GetFileAttributesW
 | 
			
		||||
        GetFileAttributes.argtypes = (ctypes.wintypes.LPWSTR,)
 | 
			
		||||
        GetFileAttributes.restype = ctypes.wintypes.DWORD
 | 
			
		||||
def _windows_create_junction(source: str, link: str):
 | 
			
		||||
    """Duly verify that the path and link are eligible to create a junction,
 | 
			
		||||
    then create the junction.
 | 
			
		||||
    """
 | 
			
		||||
    if sys.platform != "win32":
 | 
			
		||||
        raise SymlinkError("windows_create_junction method can't be used on non-Windows OS.")
 | 
			
		||||
    elif not os.path.exists(source):
 | 
			
		||||
        raise SymlinkError("Source path does not exist, cannot create a junction.")
 | 
			
		||||
    elif os.path.lexists(link):
 | 
			
		||||
        raise SymlinkError("Link path already exists, cannot create a junction.")
 | 
			
		||||
    elif not os.path.isdir(source):
 | 
			
		||||
        raise SymlinkError("Source path is not a directory, cannot create a junction.")
 | 
			
		||||
 | 
			
		||||
        INVALID_FILE_ATTRIBUTES = 0xFFFFFFFF
 | 
			
		||||
        FILE_ATTRIBUTE_REPARSE_POINT = 0x400
 | 
			
		||||
    import subprocess
 | 
			
		||||
 | 
			
		||||
        res = GetFileAttributes(path)
 | 
			
		||||
        return res != INVALID_FILE_ATTRIBUTES and bool(res & FILE_ATTRIBUTE_REPARSE_POINT)
 | 
			
		||||
    cmd = ["cmd", "/C", "mklink", "/J", link, source]
 | 
			
		||||
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 | 
			
		||||
    out, err = proc.communicate()
 | 
			
		||||
    tty.debug(out.decode())
 | 
			
		||||
    if proc.returncode != 0:
 | 
			
		||||
        err = err.decode()
 | 
			
		||||
        tty.error(err)
 | 
			
		||||
        raise SymlinkError("Make junction command returned a non-zero return code.", err)
 | 
			
		||||
 | 
			
		||||
    return False
 | 
			
		||||
 | 
			
		||||
def _windows_create_hard_link(path: str, link: str):
 | 
			
		||||
    """Duly verify that the path and link are eligible to create a hard
 | 
			
		||||
    link, then create the hard link.
 | 
			
		||||
    """
 | 
			
		||||
    if sys.platform != "win32":
 | 
			
		||||
        raise SymlinkError("windows_create_hard_link method can't be used on non-Windows OS.")
 | 
			
		||||
    elif not os.path.exists(path):
 | 
			
		||||
        raise SymlinkError(f"File path {path} does not exist. Cannot create hard link.")
 | 
			
		||||
    elif os.path.lexists(link):
 | 
			
		||||
        raise SymlinkError(f"Link path ({link}) already exists. Cannot create hard link.")
 | 
			
		||||
    elif not os.path.isfile(path):
 | 
			
		||||
        raise SymlinkError(f"File path ({link}) is not a file. Cannot create hard link.")
 | 
			
		||||
    else:
 | 
			
		||||
        tty.debug(f"Creating hard link {link} pointing to {path}")
 | 
			
		||||
        CreateHardLink(link, path)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def readlink(path: str):
 | 
			
		||||
    """Spack utility to override of os.readlink method to work cross platform"""
 | 
			
		||||
    if _windows_is_hardlink(path):
 | 
			
		||||
        return _windows_read_hard_link(path)
 | 
			
		||||
    elif _windows_is_junction(path):
 | 
			
		||||
        return _windows_read_junction(path)
 | 
			
		||||
    else:
 | 
			
		||||
        return os.readlink(path)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _windows_read_hard_link(link: str) -> str:
 | 
			
		||||
    """Find all of the files that point to the same inode as the link"""
 | 
			
		||||
    if sys.platform != "win32":
 | 
			
		||||
        raise SymlinkError("Can't read hard link on non-Windows OS.")
 | 
			
		||||
    link = os.path.abspath(link)
 | 
			
		||||
    fsutil_cmd = ["fsutil", "hardlink", "list", link]
 | 
			
		||||
    proc = subprocess.Popen(fsutil_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
 | 
			
		||||
    out, err = proc.communicate()
 | 
			
		||||
    if proc.returncode != 0:
 | 
			
		||||
        raise SymlinkError(f"An error occurred while reading hard link: {err.decode()}")
 | 
			
		||||
 | 
			
		||||
    # fsutil response does not include the drive name, so append it back to each linked file.
 | 
			
		||||
    drive, link_tail = os.path.splitdrive(os.path.abspath(link))
 | 
			
		||||
    links = set([os.path.join(drive, p) for p in out.decode().splitlines()])
 | 
			
		||||
    links.remove(link)
 | 
			
		||||
    if len(links) == 1:
 | 
			
		||||
        return links.pop()
 | 
			
		||||
    elif len(links) > 1:
 | 
			
		||||
        # TODO: How best to handle the case where 3 or more paths point to a single inode?
 | 
			
		||||
        raise SymlinkError(f"Found multiple paths pointing to the same inode {links}")
 | 
			
		||||
    else:
 | 
			
		||||
        raise SymlinkError("Cannot determine hard link source path.")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _windows_read_junction(link: str):
 | 
			
		||||
    """Find the path that a junction points to."""
 | 
			
		||||
    if sys.platform != "win32":
 | 
			
		||||
        raise SymlinkError("Can't read junction on non-Windows OS.")
 | 
			
		||||
 | 
			
		||||
    link = os.path.abspath(link)
 | 
			
		||||
    link_basename = os.path.basename(link)
 | 
			
		||||
    link_parent = os.path.dirname(link)
 | 
			
		||||
    fsutil_cmd = ["dir", "/a:l", link_parent]
 | 
			
		||||
    proc = subprocess.Popen(fsutil_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
 | 
			
		||||
    out, err = proc.communicate()
 | 
			
		||||
    if proc.returncode != 0:
 | 
			
		||||
        raise SymlinkError(f"An error occurred while reading junction: {err.decode()}")
 | 
			
		||||
    matches = re.search(rf"<JUNCTION>\s+{link_basename} \[(.*)]", out.decode())
 | 
			
		||||
    if matches:
 | 
			
		||||
        return matches.group(1)
 | 
			
		||||
    else:
 | 
			
		||||
        raise SymlinkError("Could not find junction path.")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@system_path_filter
 | 
			
		||||
def resolve_link_target_relative_to_the_link(link):
 | 
			
		||||
    """
 | 
			
		||||
    os.path.isdir uses os.path.exists, which for links will check
 | 
			
		||||
    the existence of the link target. If the link target is relative to
 | 
			
		||||
    the link, we need to construct a pathname that is valid from
 | 
			
		||||
    our cwd (which may not be the same as the link's directory)
 | 
			
		||||
    """
 | 
			
		||||
    target = readlink(link)
 | 
			
		||||
    if os.path.isabs(target):
 | 
			
		||||
        return target
 | 
			
		||||
    link_dir = os.path.dirname(os.path.abspath(link))
 | 
			
		||||
    return os.path.join(link_dir, target)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SymlinkError(SpackError):
 | 
			
		||||
    """Exception class for errors raised while creating symlinks,
 | 
			
		||||
    junctions and hard links
 | 
			
		||||
    """
 | 
			
		||||
 
 | 
			
		||||
@@ -592,7 +592,9 @@ def dump_packages(spec: "spack.spec.Spec", path: str) -> None:
 | 
			
		||||
        if node is spec:
 | 
			
		||||
            spack.repo.PATH.dump_provenance(node, dest_pkg_dir)
 | 
			
		||||
        elif source_pkg_dir:
 | 
			
		||||
            fs.install_tree(source_pkg_dir, dest_pkg_dir)
 | 
			
		||||
            fs.install_tree(
 | 
			
		||||
                source_pkg_dir, dest_pkg_dir, allow_broken_symlinks=(sys.platform != "win32")
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_dependent_ids(spec: "spack.spec.Spec") -> List[str]:
 | 
			
		||||
@@ -1316,7 +1318,6 @@ def _prepare_for_install(self, task: BuildTask) -> None:
 | 
			
		||||
        """
 | 
			
		||||
        Check the database and leftover installation directories/files and
 | 
			
		||||
        prepare for a new install attempt for an uninstalled package.
 | 
			
		||||
 | 
			
		||||
        Preparation includes cleaning up installation and stage directories
 | 
			
		||||
        and ensuring the database is up-to-date.
 | 
			
		||||
 | 
			
		||||
@@ -2394,7 +2395,9 @@ def _install_source(self) -> None:
 | 
			
		||||
        src_target = os.path.join(pkg.spec.prefix, "share", pkg.name, "src")
 | 
			
		||||
        tty.debug("{0} Copying source to {1}".format(self.pre, src_target))
 | 
			
		||||
 | 
			
		||||
        fs.install_tree(pkg.stage.source_path, src_target)
 | 
			
		||||
        fs.install_tree(
 | 
			
		||||
            pkg.stage.source_path, src_target, allow_broken_symlinks=(sys.platform != "win32")
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def _real_install(self) -> None:
 | 
			
		||||
        import spack.builder
 | 
			
		||||
 
 | 
			
		||||
@@ -4,6 +4,7 @@
 | 
			
		||||
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
 | 
			
		||||
import os
 | 
			
		||||
import platform
 | 
			
		||||
import sys
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
@@ -223,6 +224,7 @@ def test_concretize_target_ranges(root_target_range, dep_target_range, result, m
 | 
			
		||||
        (["21.11", "21.9"], None, False),
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
@pytest.mark.skipif(sys.platform == "win32", reason="Cray does not use windows")
 | 
			
		||||
def test_cray_platform_detection(versions, default, expected, tmpdir, monkeypatch, working_env):
 | 
			
		||||
    ex_path = str(tmpdir.join("fake_craype_dir"))
 | 
			
		||||
    fs.mkdirp(ex_path)
 | 
			
		||||
 
 | 
			
		||||
@@ -31,13 +31,16 @@ def test_fetch_missing_cache(tmpdir, _fetch_method):
 | 
			
		||||
@pytest.mark.parametrize("_fetch_method", ["curl", "urllib"])
 | 
			
		||||
def test_fetch(tmpdir, _fetch_method):
 | 
			
		||||
    """Ensure a fetch after expanding is effectively a no-op."""
 | 
			
		||||
    testpath = str(tmpdir)
 | 
			
		||||
    cache = os.path.join(testpath, "cache.tar.gz")
 | 
			
		||||
    cache_dir = tmpdir.join("cache")
 | 
			
		||||
    stage_dir = tmpdir.join("stage")
 | 
			
		||||
    mkdirp(cache_dir)
 | 
			
		||||
    mkdirp(stage_dir)
 | 
			
		||||
    cache = os.path.join(cache_dir, "cache.tar.gz")
 | 
			
		||||
    touch(cache)
 | 
			
		||||
    url = url_util.path_to_file_url(cache)
 | 
			
		||||
    with spack.config.override("config:url_fetch_method", _fetch_method):
 | 
			
		||||
        fetcher = CacheURLFetchStrategy(url=url)
 | 
			
		||||
        with Stage(fetcher, path=testpath) as stage:
 | 
			
		||||
        with Stage(fetcher, path=str(stage_dir)) as stage:
 | 
			
		||||
            source_path = stage.source_path
 | 
			
		||||
            mkdirp(source_path)
 | 
			
		||||
            fetcher.fetch()
 | 
			
		||||
 
 | 
			
		||||
@@ -27,7 +27,7 @@
 | 
			
		||||
import llnl.util.lang
 | 
			
		||||
import llnl.util.lock
 | 
			
		||||
import llnl.util.tty as tty
 | 
			
		||||
from llnl.util.filesystem import copy_tree, mkdirp, remove_linked_tree, working_dir
 | 
			
		||||
from llnl.util.filesystem import copy_tree, mkdirp, remove_linked_tree, touchp, working_dir
 | 
			
		||||
 | 
			
		||||
import spack.binary_distribution
 | 
			
		||||
import spack.caches
 | 
			
		||||
@@ -565,6 +565,8 @@ def mock_repo_path():
 | 
			
		||||
def _pkg_install_fn(pkg, spec, prefix):
 | 
			
		||||
    # sanity_check_prefix requires something in the install directory
 | 
			
		||||
    mkdirp(prefix.bin)
 | 
			
		||||
    if not os.path.exists(spec.package.build_log_path):
 | 
			
		||||
        touchp(spec.package.build_log_path)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture
 | 
			
		||||
 
 | 
			
		||||
@@ -13,7 +13,8 @@
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
import llnl.util.filesystem as fs
 | 
			
		||||
from llnl.util.symlink import islink, symlink
 | 
			
		||||
import llnl.util.symlink
 | 
			
		||||
from llnl.util.symlink import SymlinkError, _windows_can_symlink, islink, symlink
 | 
			
		||||
 | 
			
		||||
import spack.paths
 | 
			
		||||
 | 
			
		||||
@@ -150,7 +151,6 @@ def test_multiple_src_file_dest(self, stage):
 | 
			
		||||
                fs.install("source/a/*/*", "dest/1")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.not_on_windows("Skip test on Windows")
 | 
			
		||||
class TestCopyTree:
 | 
			
		||||
    """Tests for ``filesystem.copy_tree``"""
 | 
			
		||||
 | 
			
		||||
@@ -189,7 +189,7 @@ def test_symlinks_true(self, stage):
 | 
			
		||||
    def test_symlinks_true_ignore(self, stage):
 | 
			
		||||
        """Test copying when specifying relative paths that should be ignored"""
 | 
			
		||||
        with fs.working_dir(str(stage)):
 | 
			
		||||
            ignore = lambda p: p in ["c/d/e", "a"]
 | 
			
		||||
            ignore = lambda p: p in [os.path.join("c", "d", "e"), "a"]
 | 
			
		||||
            fs.copy_tree("source", "dest", symlinks=True, ignore=ignore)
 | 
			
		||||
            assert not os.path.exists("dest/a")
 | 
			
		||||
            assert os.path.exists("dest/c/d")
 | 
			
		||||
@@ -231,7 +231,6 @@ def test_parent_dir(self, stage):
 | 
			
		||||
                fs.copy_tree("source", "source/sub/directory")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.not_on_windows("Skip test on Windows")
 | 
			
		||||
class TestInstallTree:
 | 
			
		||||
    """Tests for ``filesystem.install_tree``"""
 | 
			
		||||
 | 
			
		||||
@@ -275,6 +274,15 @@ def test_symlinks_false(self, stage):
 | 
			
		||||
                assert not os.path.islink("dest/2")
 | 
			
		||||
            check_added_exe_permissions("source/2", "dest/2")
 | 
			
		||||
 | 
			
		||||
    @pytest.mark.skipif(sys.platform == "win32", reason="Broken symlinks not allowed on Windows")
 | 
			
		||||
    def test_allow_broken_symlinks(self, stage):
 | 
			
		||||
        """Test installing with a broken symlink."""
 | 
			
		||||
        with fs.working_dir(str(stage)):
 | 
			
		||||
            symlink("nonexistant.txt", "source/broken", allow_broken_symlinks=True)
 | 
			
		||||
            fs.install_tree("source", "dest", symlinks=True, allow_broken_symlinks=True)
 | 
			
		||||
            assert os.path.islink("dest/broken")
 | 
			
		||||
            assert not os.path.exists(os.readlink("dest/broken"))
 | 
			
		||||
 | 
			
		||||
    def test_glob_src(self, stage):
 | 
			
		||||
        """Test using a glob as the source."""
 | 
			
		||||
 | 
			
		||||
@@ -746,6 +754,7 @@ def test_is_nonsymlink_exe_with_shebang(tmpdir):
 | 
			
		||||
        assert not fs.is_nonsymlink_exe_with_shebang("symlink_to_executable_script")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.skipif(sys.platform == "win32", reason="Unix-only test.")
 | 
			
		||||
def test_lexists_islink_isdir(tmpdir):
 | 
			
		||||
    root = str(tmpdir)
 | 
			
		||||
 | 
			
		||||
@@ -764,12 +773,12 @@ def test_lexists_islink_isdir(tmpdir):
 | 
			
		||||
    with open(file, "wb") as f:
 | 
			
		||||
        f.write(b"file")
 | 
			
		||||
 | 
			
		||||
    os.symlink("dir", symlink_to_dir)
 | 
			
		||||
    os.symlink("file", symlink_to_file)
 | 
			
		||||
    os.symlink("does_not_exist", dangling_symlink)
 | 
			
		||||
    os.symlink("dangling_symlink", symlink_to_dangling_symlink)
 | 
			
		||||
    os.symlink("symlink_to_dir", symlink_to_symlink_to_dir)
 | 
			
		||||
    os.symlink("symlink_to_file", symlink_to_symlink_to_file)
 | 
			
		||||
    symlink("dir", symlink_to_dir)
 | 
			
		||||
    symlink("file", symlink_to_file)
 | 
			
		||||
    symlink("does_not_exist", dangling_symlink)
 | 
			
		||||
    symlink("dangling_symlink", symlink_to_dangling_symlink)
 | 
			
		||||
    symlink("symlink_to_dir", symlink_to_symlink_to_dir)
 | 
			
		||||
    symlink("symlink_to_file", symlink_to_symlink_to_file)
 | 
			
		||||
 | 
			
		||||
    assert fs.lexists_islink_isdir(dir) == (True, False, True)
 | 
			
		||||
    assert fs.lexists_islink_isdir(file) == (True, False, False)
 | 
			
		||||
@@ -781,6 +790,57 @@ def test_lexists_islink_isdir(tmpdir):
 | 
			
		||||
    assert fs.lexists_islink_isdir(symlink_to_symlink_to_file) == (True, True, False)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.skipif(sys.platform != "win32", reason="For Windows Only")
 | 
			
		||||
@pytest.mark.parametrize("win_can_symlink", [True, False])
 | 
			
		||||
def test_lexists_islink_isdir_windows(tmpdir, monkeypatch, win_can_symlink):
 | 
			
		||||
    """Run on windows without elevated privileges to test junctions and hard links which have
 | 
			
		||||
    different results from the lexists_islink_isdir method.
 | 
			
		||||
    """
 | 
			
		||||
    if win_can_symlink and not _windows_can_symlink():
 | 
			
		||||
        pytest.skip("Cannot test dev mode behavior without dev mode enabled.")
 | 
			
		||||
    with tmpdir.as_cwd():
 | 
			
		||||
        monkeypatch.setattr(llnl.util.symlink, "_windows_can_symlink", lambda: win_can_symlink)
 | 
			
		||||
        dir = str(tmpdir.join("dir"))
 | 
			
		||||
        file = str(tmpdir.join("file"))
 | 
			
		||||
        nonexistent = str(tmpdir.join("does_not_exist"))
 | 
			
		||||
        symlink_to_dir = str(tmpdir.join("symlink_to_dir"))
 | 
			
		||||
        symlink_to_file = str(tmpdir.join("symlink_to_file"))
 | 
			
		||||
        dangling_symlink = str(tmpdir.join("dangling_symlink"))
 | 
			
		||||
        symlink_to_dangling_symlink = str(tmpdir.join("symlink_to_dangling_symlink"))
 | 
			
		||||
        symlink_to_symlink_to_dir = str(tmpdir.join("symlink_to_symlink_to_dir"))
 | 
			
		||||
        symlink_to_symlink_to_file = str(tmpdir.join("symlink_to_symlink_to_file"))
 | 
			
		||||
 | 
			
		||||
        os.mkdir(dir)
 | 
			
		||||
        assert fs.lexists_islink_isdir(dir) == (True, False, True)
 | 
			
		||||
 | 
			
		||||
        symlink("dir", symlink_to_dir)
 | 
			
		||||
        assert fs.lexists_islink_isdir(dir) == (True, False, True)
 | 
			
		||||
        assert fs.lexists_islink_isdir(symlink_to_dir) == (True, True, True)
 | 
			
		||||
 | 
			
		||||
        with open(file, "wb") as f:
 | 
			
		||||
            f.write(b"file")
 | 
			
		||||
        assert fs.lexists_islink_isdir(file) == (True, False, False)
 | 
			
		||||
 | 
			
		||||
        symlink("file", symlink_to_file)
 | 
			
		||||
        if win_can_symlink:
 | 
			
		||||
            assert fs.lexists_islink_isdir(file) == (True, False, False)
 | 
			
		||||
        else:
 | 
			
		||||
            assert fs.lexists_islink_isdir(file) == (True, True, False)
 | 
			
		||||
        assert fs.lexists_islink_isdir(symlink_to_file) == (True, True, False)
 | 
			
		||||
 | 
			
		||||
        with pytest.raises(SymlinkError):
 | 
			
		||||
            symlink("does_not_exist", dangling_symlink)
 | 
			
		||||
            symlink("dangling_symlink", symlink_to_dangling_symlink)
 | 
			
		||||
 | 
			
		||||
        symlink("symlink_to_dir", symlink_to_symlink_to_dir)
 | 
			
		||||
        symlink("symlink_to_file", symlink_to_symlink_to_file)
 | 
			
		||||
 | 
			
		||||
        assert fs.lexists_islink_isdir(nonexistent) == (False, False, False)
 | 
			
		||||
        assert fs.lexists_islink_isdir(symlink_to_dangling_symlink) == (False, False, False)
 | 
			
		||||
        assert fs.lexists_islink_isdir(symlink_to_symlink_to_dir) == (True, True, True)
 | 
			
		||||
        assert fs.lexists_islink_isdir(symlink_to_symlink_to_file) == (True, True, False)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class RegisterVisitor(fs.BaseDirectoryVisitor):
 | 
			
		||||
    """A directory visitor that keeps track of all visited paths"""
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -4,12 +4,14 @@
 | 
			
		||||
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
 | 
			
		||||
 | 
			
		||||
import os
 | 
			
		||||
import sys
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
import llnl.util.symlink
 | 
			
		||||
from llnl.util.filesystem import mkdirp, touchp, visit_directory_tree, working_dir
 | 
			
		||||
from llnl.util.link_tree import DestinationMergeVisitor, LinkTree, SourceMergeVisitor
 | 
			
		||||
from llnl.util.symlink import islink
 | 
			
		||||
from llnl.util.symlink import _windows_can_symlink, islink, readlink, symlink
 | 
			
		||||
 | 
			
		||||
from spack.stage import Stage
 | 
			
		||||
 | 
			
		||||
@@ -44,77 +46,116 @@ def link_tree(stage):
 | 
			
		||||
def check_file_link(filename, expected_target):
 | 
			
		||||
    assert os.path.isfile(filename)
 | 
			
		||||
    assert islink(filename)
 | 
			
		||||
    assert os.path.abspath(os.path.realpath(filename)) == os.path.abspath(expected_target)
 | 
			
		||||
    if sys.platform != "win32" or llnl.util.symlink._windows_can_symlink():
 | 
			
		||||
        assert os.path.abspath(os.path.realpath(filename)) == os.path.abspath(expected_target)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def check_dir(filename):
 | 
			
		||||
    assert os.path.isdir(filename)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_merge_to_new_directory(stage, link_tree):
 | 
			
		||||
@pytest.mark.parametrize("run_as_root", [True, False])
 | 
			
		||||
def test_merge_to_new_directory(stage, link_tree, monkeypatch, run_as_root):
 | 
			
		||||
    if sys.platform != "win32":
 | 
			
		||||
        if run_as_root:
 | 
			
		||||
            pass
 | 
			
		||||
        else:
 | 
			
		||||
            pytest.skip("Skipping duplicate test.")
 | 
			
		||||
    elif _windows_can_symlink() or not run_as_root:
 | 
			
		||||
        monkeypatch.setattr(llnl.util.symlink, "_windows_can_symlink", lambda: run_as_root)
 | 
			
		||||
    else:
 | 
			
		||||
        # Skip if trying to run as dev-mode without having dev-mode.
 | 
			
		||||
        pytest.skip("Skipping portion of test which required dev-mode privileges.")
 | 
			
		||||
 | 
			
		||||
    with working_dir(stage.path):
 | 
			
		||||
        link_tree.merge("dest")
 | 
			
		||||
 | 
			
		||||
        check_file_link("dest/1", "source/1")
 | 
			
		||||
        check_file_link("dest/a/b/2", "source/a/b/2")
 | 
			
		||||
        check_file_link("dest/a/b/3", "source/a/b/3")
 | 
			
		||||
        check_file_link("dest/c/4", "source/c/4")
 | 
			
		||||
        check_file_link("dest/c/d/5", "source/c/d/5")
 | 
			
		||||
        check_file_link("dest/c/d/6", "source/c/d/6")
 | 
			
		||||
        check_file_link("dest/c/d/e/7", "source/c/d/e/7")
 | 
			
		||||
        files = [
 | 
			
		||||
            ("dest/1", "source/1"),
 | 
			
		||||
            ("dest/a/b/2", "source/a/b/2"),
 | 
			
		||||
            ("dest/a/b/3", "source/a/b/3"),
 | 
			
		||||
            ("dest/c/4", "source/c/4"),
 | 
			
		||||
            ("dest/c/d/5", "source/c/d/5"),
 | 
			
		||||
            ("dest/c/d/6", "source/c/d/6"),
 | 
			
		||||
            ("dest/c/d/e/7", "source/c/d/e/7"),
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
        assert os.path.isabs(os.readlink("dest/1"))
 | 
			
		||||
        assert os.path.isabs(os.readlink("dest/a/b/2"))
 | 
			
		||||
        assert os.path.isabs(os.readlink("dest/a/b/3"))
 | 
			
		||||
        assert os.path.isabs(os.readlink("dest/c/4"))
 | 
			
		||||
        assert os.path.isabs(os.readlink("dest/c/d/5"))
 | 
			
		||||
        assert os.path.isabs(os.readlink("dest/c/d/6"))
 | 
			
		||||
        assert os.path.isabs(os.readlink("dest/c/d/e/7"))
 | 
			
		||||
        for dest, source in files:
 | 
			
		||||
            check_file_link(dest, source)
 | 
			
		||||
            assert os.path.isabs(readlink(dest))
 | 
			
		||||
 | 
			
		||||
        link_tree.unmerge("dest")
 | 
			
		||||
 | 
			
		||||
        assert not os.path.exists("dest")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_merge_to_new_directory_relative(stage, link_tree):
 | 
			
		||||
@pytest.mark.parametrize("run_as_root", [True, False])
 | 
			
		||||
def test_merge_to_new_directory_relative(stage, link_tree, monkeypatch, run_as_root):
 | 
			
		||||
    if sys.platform != "win32":
 | 
			
		||||
        if run_as_root:
 | 
			
		||||
            pass
 | 
			
		||||
        else:
 | 
			
		||||
            pytest.skip("Skipping duplicate test.")
 | 
			
		||||
    elif _windows_can_symlink() or not run_as_root:
 | 
			
		||||
        monkeypatch.setattr(llnl.util.symlink, "_windows_can_symlink", lambda: run_as_root)
 | 
			
		||||
    else:
 | 
			
		||||
        # Skip if trying to run as dev-mode without having dev-mode.
 | 
			
		||||
        pytest.skip("Skipping portion of test which required dev-mode privileges.")
 | 
			
		||||
 | 
			
		||||
    with working_dir(stage.path):
 | 
			
		||||
        link_tree.merge("dest", relative=True)
 | 
			
		||||
 | 
			
		||||
        check_file_link("dest/1", "source/1")
 | 
			
		||||
        check_file_link("dest/a/b/2", "source/a/b/2")
 | 
			
		||||
        check_file_link("dest/a/b/3", "source/a/b/3")
 | 
			
		||||
        check_file_link("dest/c/4", "source/c/4")
 | 
			
		||||
        check_file_link("dest/c/d/5", "source/c/d/5")
 | 
			
		||||
        check_file_link("dest/c/d/6", "source/c/d/6")
 | 
			
		||||
        check_file_link("dest/c/d/e/7", "source/c/d/e/7")
 | 
			
		||||
        files = [
 | 
			
		||||
            ("dest/1", "source/1"),
 | 
			
		||||
            ("dest/a/b/2", "source/a/b/2"),
 | 
			
		||||
            ("dest/a/b/3", "source/a/b/3"),
 | 
			
		||||
            ("dest/c/4", "source/c/4"),
 | 
			
		||||
            ("dest/c/d/5", "source/c/d/5"),
 | 
			
		||||
            ("dest/c/d/6", "source/c/d/6"),
 | 
			
		||||
            ("dest/c/d/e/7", "source/c/d/e/7"),
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
        assert not os.path.isabs(os.readlink("dest/1"))
 | 
			
		||||
        assert not os.path.isabs(os.readlink("dest/a/b/2"))
 | 
			
		||||
        assert not os.path.isabs(os.readlink("dest/a/b/3"))
 | 
			
		||||
        assert not os.path.isabs(os.readlink("dest/c/4"))
 | 
			
		||||
        assert not os.path.isabs(os.readlink("dest/c/d/5"))
 | 
			
		||||
        assert not os.path.isabs(os.readlink("dest/c/d/6"))
 | 
			
		||||
        assert not os.path.isabs(os.readlink("dest/c/d/e/7"))
 | 
			
		||||
        for dest, source in files:
 | 
			
		||||
            check_file_link(dest, source)
 | 
			
		||||
            # Hard links/junctions are inherently absolute.
 | 
			
		||||
            if sys.platform != "win32" or run_as_root:
 | 
			
		||||
                assert not os.path.isabs(readlink(dest))
 | 
			
		||||
 | 
			
		||||
        link_tree.unmerge("dest")
 | 
			
		||||
 | 
			
		||||
        assert not os.path.exists("dest")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_merge_to_existing_directory(stage, link_tree):
 | 
			
		||||
@pytest.mark.parametrize("run_as_root", [True, False])
 | 
			
		||||
def test_merge_to_existing_directory(stage, link_tree, monkeypatch, run_as_root):
 | 
			
		||||
    if sys.platform != "win32":
 | 
			
		||||
        if run_as_root:
 | 
			
		||||
            pass
 | 
			
		||||
        else:
 | 
			
		||||
            pytest.skip("Skipping duplicate test.")
 | 
			
		||||
    elif _windows_can_symlink() or not run_as_root:
 | 
			
		||||
        monkeypatch.setattr(llnl.util.symlink, "_windows_can_symlink", lambda: run_as_root)
 | 
			
		||||
    else:
 | 
			
		||||
        # Skip if trying to run as dev-mode without having dev-mode.
 | 
			
		||||
        pytest.skip("Skipping portion of test which required dev-mode privileges.")
 | 
			
		||||
 | 
			
		||||
    with working_dir(stage.path):
 | 
			
		||||
        touchp("dest/x")
 | 
			
		||||
        touchp("dest/a/b/y")
 | 
			
		||||
 | 
			
		||||
        link_tree.merge("dest")
 | 
			
		||||
 | 
			
		||||
        check_file_link("dest/1", "source/1")
 | 
			
		||||
        check_file_link("dest/a/b/2", "source/a/b/2")
 | 
			
		||||
        check_file_link("dest/a/b/3", "source/a/b/3")
 | 
			
		||||
        check_file_link("dest/c/4", "source/c/4")
 | 
			
		||||
        check_file_link("dest/c/d/5", "source/c/d/5")
 | 
			
		||||
        check_file_link("dest/c/d/6", "source/c/d/6")
 | 
			
		||||
        check_file_link("dest/c/d/e/7", "source/c/d/e/7")
 | 
			
		||||
        files = [
 | 
			
		||||
            ("dest/1", "source/1"),
 | 
			
		||||
            ("dest/a/b/2", "source/a/b/2"),
 | 
			
		||||
            ("dest/a/b/3", "source/a/b/3"),
 | 
			
		||||
            ("dest/c/4", "source/c/4"),
 | 
			
		||||
            ("dest/c/d/5", "source/c/d/5"),
 | 
			
		||||
            ("dest/c/d/6", "source/c/d/6"),
 | 
			
		||||
            ("dest/c/d/e/7", "source/c/d/e/7"),
 | 
			
		||||
        ]
 | 
			
		||||
        for dest, source in files:
 | 
			
		||||
            check_file_link(dest, source)
 | 
			
		||||
 | 
			
		||||
        assert os.path.isfile("dest/x")
 | 
			
		||||
        assert os.path.isfile("dest/a/b/y")
 | 
			
		||||
@@ -124,13 +165,8 @@ def test_merge_to_existing_directory(stage, link_tree):
 | 
			
		||||
        assert os.path.isfile("dest/x")
 | 
			
		||||
        assert os.path.isfile("dest/a/b/y")
 | 
			
		||||
 | 
			
		||||
        assert not os.path.isfile("dest/1")
 | 
			
		||||
        assert not os.path.isfile("dest/a/b/2")
 | 
			
		||||
        assert not os.path.isfile("dest/a/b/3")
 | 
			
		||||
        assert not os.path.isfile("dest/c/4")
 | 
			
		||||
        assert not os.path.isfile("dest/c/d/5")
 | 
			
		||||
        assert not os.path.isfile("dest/c/d/6")
 | 
			
		||||
        assert not os.path.isfile("dest/c/d/e/7")
 | 
			
		||||
        for dest, _ in files:
 | 
			
		||||
            assert not os.path.isfile(dest)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_merge_with_empty_directories(stage, link_tree):
 | 
			
		||||
@@ -192,9 +228,9 @@ def test_source_merge_visitor_does_not_follow_symlinked_dirs_at_depth(tmpdir):
 | 
			
		||||
        os.mkdir(j("a", "b"))
 | 
			
		||||
        os.mkdir(j("a", "b", "c"))
 | 
			
		||||
        os.mkdir(j("a", "b", "c", "d"))
 | 
			
		||||
        os.symlink(j("b"), j("a", "symlink_b"))
 | 
			
		||||
        os.symlink(j("c"), j("a", "b", "symlink_c"))
 | 
			
		||||
        os.symlink(j("d"), j("a", "b", "c", "symlink_d"))
 | 
			
		||||
        symlink(j("b"), j("a", "symlink_b"))
 | 
			
		||||
        symlink(j("c"), j("a", "b", "symlink_c"))
 | 
			
		||||
        symlink(j("d"), j("a", "b", "c", "symlink_d"))
 | 
			
		||||
        with open(j("a", "b", "c", "d", "file"), "wb"):
 | 
			
		||||
            pass
 | 
			
		||||
 | 
			
		||||
@@ -236,10 +272,11 @@ def test_source_merge_visitor_cant_be_cyclical(tmpdir):
 | 
			
		||||
    j = os.path.join
 | 
			
		||||
    with tmpdir.as_cwd():
 | 
			
		||||
        os.mkdir(j("a"))
 | 
			
		||||
        os.symlink(j("..", "b"), j("a", "symlink_b"))
 | 
			
		||||
        os.symlink(j("symlink_b"), j("a", "symlink_b_b"))
 | 
			
		||||
        os.mkdir(j("b"))
 | 
			
		||||
        os.symlink(j("..", "a"), j("b", "symlink_a"))
 | 
			
		||||
 | 
			
		||||
        symlink(j("..", "b"), j("a", "symlink_b"))
 | 
			
		||||
        symlink(j("symlink_b"), j("a", "symlink_b_b"))
 | 
			
		||||
        symlink(j("..", "a"), j("b", "symlink_a"))
 | 
			
		||||
 | 
			
		||||
    visitor = SourceMergeVisitor()
 | 
			
		||||
    visit_directory_tree(str(tmpdir), visitor)
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										247
									
								
								lib/spack/spack/test/llnl/util/symlink.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										247
									
								
								lib/spack/spack/test/llnl/util/symlink.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,247 @@
 | 
			
		||||
# Copyright 2013-2023 Lawrence Livermore National Security, LLC and other
 | 
			
		||||
# Spack Project Developers. See the top-level COPYRIGHT file for details.
 | 
			
		||||
#
 | 
			
		||||
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
 | 
			
		||||
 | 
			
		||||
"""Tests for ``llnl/util/symlink.py``"""
 | 
			
		||||
import os
 | 
			
		||||
import sys
 | 
			
		||||
import tempfile
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
from llnl.util import symlink
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_symlink_file(tmpdir):
 | 
			
		||||
    """Test the symlink.symlink functionality on all operating systems for a file"""
 | 
			
		||||
    with tmpdir.as_cwd():
 | 
			
		||||
        test_dir = str(tmpdir)
 | 
			
		||||
        fd, real_file = tempfile.mkstemp(prefix="real", suffix=".txt", dir=test_dir)
 | 
			
		||||
        link_file = str(tmpdir.join("link.txt"))
 | 
			
		||||
        assert os.path.exists(link_file) is False
 | 
			
		||||
        symlink.symlink(source_path=real_file, link_path=link_file)
 | 
			
		||||
        assert os.path.exists(link_file)
 | 
			
		||||
        assert symlink.islink(link_file)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_symlink_dir(tmpdir):
 | 
			
		||||
    """Test the symlink.symlink functionality on all operating systems for a directory"""
 | 
			
		||||
    with tmpdir.as_cwd():
 | 
			
		||||
        test_dir = str(tmpdir)
 | 
			
		||||
        real_dir = os.path.join(test_dir, "real_dir")
 | 
			
		||||
        link_dir = os.path.join(test_dir, "link_dir")
 | 
			
		||||
        os.mkdir(real_dir)
 | 
			
		||||
        symlink.symlink(source_path=real_dir, link_path=link_dir)
 | 
			
		||||
        assert os.path.exists(link_dir)
 | 
			
		||||
        assert symlink.islink(link_dir)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_symlink_source_not_exists(tmpdir):
 | 
			
		||||
    """Test the symlink.symlink method for the case where a source path does not exist"""
 | 
			
		||||
    with tmpdir.as_cwd():
 | 
			
		||||
        test_dir = str(tmpdir)
 | 
			
		||||
        real_dir = os.path.join(test_dir, "real_dir")
 | 
			
		||||
        link_dir = os.path.join(test_dir, "link_dir")
 | 
			
		||||
        with pytest.raises(symlink.SymlinkError):
 | 
			
		||||
            symlink.symlink(source_path=real_dir, link_path=link_dir, allow_broken_symlinks=False)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_symlink_src_relative_to_link(tmpdir):
 | 
			
		||||
    """Test the symlink.symlink functionality where the source value exists relative to the link
 | 
			
		||||
    but not relative to the cwd"""
 | 
			
		||||
    with tmpdir.as_cwd():
 | 
			
		||||
        subdir_1 = tmpdir.join("a")
 | 
			
		||||
        subdir_2 = os.path.join(subdir_1, "b")
 | 
			
		||||
        link_dir = os.path.join(subdir_1, "c")
 | 
			
		||||
 | 
			
		||||
        os.mkdir(subdir_1)
 | 
			
		||||
        os.mkdir(subdir_2)
 | 
			
		||||
 | 
			
		||||
        fd, real_file = tempfile.mkstemp(prefix="real", suffix=".txt", dir=subdir_2)
 | 
			
		||||
        link_file = os.path.join(subdir_1, "link.txt")
 | 
			
		||||
 | 
			
		||||
        symlink.symlink(
 | 
			
		||||
            source_path=f"b/{os.path.basename(real_file)}",
 | 
			
		||||
            link_path=f"a/{os.path.basename(link_file)}",
 | 
			
		||||
        )
 | 
			
		||||
        assert os.path.exists(link_file)
 | 
			
		||||
        assert symlink.islink(link_file)
 | 
			
		||||
        # Check dirs
 | 
			
		||||
        assert not os.path.lexists(link_dir)
 | 
			
		||||
        symlink.symlink(source_path="b", link_path="a/c")
 | 
			
		||||
        assert os.path.lexists(link_dir)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_symlink_src_not_relative_to_link(tmpdir):
 | 
			
		||||
    """Test the symlink.symlink functionality where the source value does not exist relative to
 | 
			
		||||
    the link and not relative to the cwd. NOTE that this symlink api call is EXPECTED to raise
 | 
			
		||||
    a symlink.SymlinkError exception that we catch."""
 | 
			
		||||
    with tmpdir.as_cwd():
 | 
			
		||||
        test_dir = str(tmpdir)
 | 
			
		||||
        subdir_1 = os.path.join(test_dir, "a")
 | 
			
		||||
        subdir_2 = os.path.join(subdir_1, "b")
 | 
			
		||||
        link_dir = os.path.join(subdir_1, "c")
 | 
			
		||||
        os.mkdir(subdir_1)
 | 
			
		||||
        os.mkdir(subdir_2)
 | 
			
		||||
        fd, real_file = tempfile.mkstemp(prefix="real", suffix=".txt", dir=subdir_2)
 | 
			
		||||
        link_file = str(tmpdir.join("link.txt"))
 | 
			
		||||
        # Expected SymlinkError because source path does not exist relative to link path
 | 
			
		||||
        with pytest.raises(symlink.SymlinkError):
 | 
			
		||||
            symlink.symlink(
 | 
			
		||||
                source_path=f"d/{os.path.basename(real_file)}",
 | 
			
		||||
                link_path=f"a/{os.path.basename(link_file)}",
 | 
			
		||||
                allow_broken_symlinks=False,
 | 
			
		||||
            )
 | 
			
		||||
        assert not os.path.exists(link_file)
 | 
			
		||||
        # Check dirs
 | 
			
		||||
        assert not os.path.lexists(link_dir)
 | 
			
		||||
        with pytest.raises(symlink.SymlinkError):
 | 
			
		||||
            symlink.symlink(source_path="d", link_path="a/c", allow_broken_symlinks=False)
 | 
			
		||||
        assert not os.path.lexists(link_dir)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_symlink_link_already_exists(tmpdir):
 | 
			
		||||
    """Test the symlink.symlink method for the case where a link already exists"""
 | 
			
		||||
    with tmpdir.as_cwd():
 | 
			
		||||
        test_dir = str(tmpdir)
 | 
			
		||||
        real_dir = os.path.join(test_dir, "real_dir")
 | 
			
		||||
        link_dir = os.path.join(test_dir, "link_dir")
 | 
			
		||||
        os.mkdir(real_dir)
 | 
			
		||||
        symlink.symlink(real_dir, link_dir, allow_broken_symlinks=False)
 | 
			
		||||
        assert os.path.exists(link_dir)
 | 
			
		||||
        with pytest.raises(symlink.SymlinkError):
 | 
			
		||||
            symlink.symlink(source_path=real_dir, link_path=link_dir, allow_broken_symlinks=False)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.skipif(not symlink._windows_can_symlink(), reason="Test requires elevated privileges")
 | 
			
		||||
@pytest.mark.skipif(sys.platform != "win32", reason="Test is only for Windows")
 | 
			
		||||
def test_symlink_win_file(tmpdir):
 | 
			
		||||
    """Check that symlink.symlink makes a symlink file when run with elevated permissions"""
 | 
			
		||||
    with tmpdir.as_cwd():
 | 
			
		||||
        test_dir = str(tmpdir)
 | 
			
		||||
        fd, real_file = tempfile.mkstemp(prefix="real", suffix=".txt", dir=test_dir)
 | 
			
		||||
        link_file = str(tmpdir.join("link.txt"))
 | 
			
		||||
        symlink.symlink(source_path=real_file, link_path=link_file)
 | 
			
		||||
        # Verify that all expected conditions are met
 | 
			
		||||
        assert os.path.exists(link_file)
 | 
			
		||||
        assert symlink.islink(link_file)
 | 
			
		||||
        assert os.path.islink(link_file)
 | 
			
		||||
        assert not symlink._windows_is_hardlink(link_file)
 | 
			
		||||
        assert not symlink._windows_is_junction(link_file)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.skipif(not symlink._windows_can_symlink(), reason="Test requires elevated privileges")
 | 
			
		||||
@pytest.mark.skipif(sys.platform != "win32", reason="Test is only for Windows")
 | 
			
		||||
def test_symlink_win_dir(tmpdir):
 | 
			
		||||
    """Check that symlink.symlink makes a symlink dir when run with elevated permissions"""
 | 
			
		||||
    with tmpdir.as_cwd():
 | 
			
		||||
        test_dir = str(tmpdir)
 | 
			
		||||
        real_dir = os.path.join(test_dir, "real")
 | 
			
		||||
        link_dir = os.path.join(test_dir, "link")
 | 
			
		||||
        os.mkdir(real_dir)
 | 
			
		||||
        symlink.symlink(source_path=real_dir, link_path=link_dir)
 | 
			
		||||
        # Verify that all expected conditions are met
 | 
			
		||||
        assert os.path.exists(link_dir)
 | 
			
		||||
        assert symlink.islink(link_dir)
 | 
			
		||||
        assert os.path.islink(link_dir)
 | 
			
		||||
        assert not symlink._windows_is_hardlink(link_dir)
 | 
			
		||||
        assert not symlink._windows_is_junction(link_dir)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.skipif(sys.platform != "win32", reason="Test is only for Windows")
 | 
			
		||||
def test_windows_create_junction(tmpdir):
 | 
			
		||||
    """Test the symlink._windows_create_junction method"""
 | 
			
		||||
    with tmpdir.as_cwd():
 | 
			
		||||
        test_dir = str(tmpdir)
 | 
			
		||||
        junction_real_dir = os.path.join(test_dir, "real_dir")
 | 
			
		||||
        junction_link_dir = os.path.join(test_dir, "link_dir")
 | 
			
		||||
        os.mkdir(junction_real_dir)
 | 
			
		||||
        symlink._windows_create_junction(junction_real_dir, junction_link_dir)
 | 
			
		||||
        # Verify that all expected conditions are met
 | 
			
		||||
        assert os.path.exists(junction_link_dir)
 | 
			
		||||
        assert symlink._windows_is_junction(junction_link_dir)
 | 
			
		||||
        assert symlink.islink(junction_link_dir)
 | 
			
		||||
        assert not os.path.islink(junction_link_dir)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.skipif(sys.platform != "win32", reason="Test is only for Windows")
 | 
			
		||||
def test_windows_create_hard_link(tmpdir):
 | 
			
		||||
    """Test the symlink._windows_create_hard_link method"""
 | 
			
		||||
    with tmpdir.as_cwd():
 | 
			
		||||
        test_dir = str(tmpdir)
 | 
			
		||||
        fd, real_file = tempfile.mkstemp(prefix="real", suffix=".txt", dir=test_dir)
 | 
			
		||||
        link_file = str(tmpdir.join("link.txt"))
 | 
			
		||||
        symlink._windows_create_hard_link(real_file, link_file)
 | 
			
		||||
        # Verify that all expected conditions are met
 | 
			
		||||
        assert os.path.exists(link_file)
 | 
			
		||||
        assert symlink._windows_is_hardlink(real_file)
 | 
			
		||||
        assert symlink._windows_is_hardlink(link_file)
 | 
			
		||||
        assert symlink.islink(link_file)
 | 
			
		||||
        assert not os.path.islink(link_file)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.skipif(sys.platform != "win32", reason="Test is only for Windows")
 | 
			
		||||
def test_windows_create_link_dir(tmpdir):
 | 
			
		||||
    """Test the functionality of the windows_create_link method with a directory
 | 
			
		||||
    which should result in making a junction.
 | 
			
		||||
    """
 | 
			
		||||
    with tmpdir.as_cwd():
 | 
			
		||||
        test_dir = str(tmpdir)
 | 
			
		||||
        real_dir = os.path.join(test_dir, "real")
 | 
			
		||||
        link_dir = os.path.join(test_dir, "link")
 | 
			
		||||
        os.mkdir(real_dir)
 | 
			
		||||
        symlink._windows_create_link(real_dir, link_dir)
 | 
			
		||||
        # Verify that all expected conditions are met
 | 
			
		||||
        assert os.path.exists(link_dir)
 | 
			
		||||
        assert symlink.islink(link_dir)
 | 
			
		||||
        assert not symlink._windows_is_hardlink(link_dir)
 | 
			
		||||
        assert symlink._windows_is_junction(link_dir)
 | 
			
		||||
        assert not os.path.islink(link_dir)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.skipif(sys.platform != "win32", reason="Test is only for Windows")
 | 
			
		||||
def test_windows_create_link_file(tmpdir):
 | 
			
		||||
    """Test the functionality of the windows_create_link method with a file
 | 
			
		||||
    which should result in the creation of a hard link. It also tests the
 | 
			
		||||
    functionality of the symlink islink infrastructure.
 | 
			
		||||
    """
 | 
			
		||||
    with tmpdir.as_cwd():
 | 
			
		||||
        test_dir = str(tmpdir)
 | 
			
		||||
        fd, real_file = tempfile.mkstemp(prefix="real", suffix=".txt", dir=test_dir)
 | 
			
		||||
        link_file = str(tmpdir.join("link.txt"))
 | 
			
		||||
        symlink._windows_create_link(real_file, link_file)
 | 
			
		||||
        # Verify that all expected conditions are met
 | 
			
		||||
        assert symlink._windows_is_hardlink(link_file)
 | 
			
		||||
        assert symlink.islink(link_file)
 | 
			
		||||
        assert not symlink._windows_is_junction(link_file)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.skipif(sys.platform != "win32", reason="Test is only for Windows")
 | 
			
		||||
def test_windows_read_link(tmpdir):
 | 
			
		||||
    """Makes sure symlink.readlink can read the link source for hard links and
 | 
			
		||||
    junctions on windows."""
 | 
			
		||||
    with tmpdir.as_cwd():
 | 
			
		||||
        real_dir_1 = "real_dir_1"
 | 
			
		||||
        real_dir_2 = "real_dir_2"
 | 
			
		||||
        link_dir_1 = "link_dir_1"
 | 
			
		||||
        link_dir_2 = "link_dir_2"
 | 
			
		||||
        os.mkdir(real_dir_1)
 | 
			
		||||
        os.mkdir(real_dir_2)
 | 
			
		||||
 | 
			
		||||
        # Create a file and a directory
 | 
			
		||||
        _, real_file_1 = tempfile.mkstemp(prefix="real_1", suffix=".txt", dir=".")
 | 
			
		||||
        _, real_file_2 = tempfile.mkstemp(prefix="real_2", suffix=".txt", dir=".")
 | 
			
		||||
        link_file_1 = "link_1.txt"
 | 
			
		||||
        link_file_2 = "link_2.txt"
 | 
			
		||||
 | 
			
		||||
        # Make hard link/junction
 | 
			
		||||
        symlink._windows_create_hard_link(real_file_1, link_file_1)
 | 
			
		||||
        symlink._windows_create_hard_link(real_file_2, link_file_2)
 | 
			
		||||
        symlink._windows_create_junction(real_dir_1, link_dir_1)
 | 
			
		||||
        symlink._windows_create_junction(real_dir_2, link_dir_2)
 | 
			
		||||
 | 
			
		||||
        assert symlink.readlink(link_file_1) == os.path.abspath(real_file_1)
 | 
			
		||||
        assert symlink.readlink(link_file_2) == os.path.abspath(real_file_2)
 | 
			
		||||
        assert symlink.readlink(link_dir_1) == os.path.abspath(real_dir_1)
 | 
			
		||||
        assert symlink.readlink(link_dir_2) == os.path.abspath(real_dir_2)
 | 
			
		||||
@@ -8,7 +8,7 @@
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
from llnl.util.filesystem import resolve_link_target_relative_to_the_link
 | 
			
		||||
from llnl.util.symlink import resolve_link_target_relative_to_the_link
 | 
			
		||||
 | 
			
		||||
import spack.mirror
 | 
			
		||||
import spack.repo
 | 
			
		||||
@@ -228,6 +228,9 @@ def successful_expand(_class):
 | 
			
		||||
    def successful_apply(*args, **kwargs):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def successful_symlink(*args, **kwargs):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    with Stage("spack-mirror-test") as stage:
 | 
			
		||||
        mirror_root = os.path.join(stage.path, "test-mirror")
 | 
			
		||||
 | 
			
		||||
@@ -235,6 +238,7 @@ def successful_apply(*args, **kwargs):
 | 
			
		||||
        monkeypatch.setattr(spack.fetch_strategy.URLFetchStrategy, "expand", successful_expand)
 | 
			
		||||
        monkeypatch.setattr(spack.patch, "apply_patch", successful_apply)
 | 
			
		||||
        monkeypatch.setattr(spack.caches.MirrorCache, "store", record_store)
 | 
			
		||||
        monkeypatch.setattr(spack.caches.MirrorCache, "symlink", successful_symlink)
 | 
			
		||||
 | 
			
		||||
        with spack.config.override("config:checksum", False):
 | 
			
		||||
            spack.mirror.create(mirror_root, list(spec.traverse()))
 | 
			
		||||
 
 | 
			
		||||
@@ -147,8 +147,15 @@ def test_relocate_links(tmpdir):
 | 
			
		||||
 | 
			
		||||
    own_prefix_path = str(tmpdir.join("prefix_a", "file"))
 | 
			
		||||
    dep_prefix_path = str(tmpdir.join("prefix_b", "file"))
 | 
			
		||||
    new_own_prefix_path = str(tmpdir.join("new_prefix_a", "file"))
 | 
			
		||||
    new_dep_prefix_path = str(tmpdir.join("new_prefix_b", "file"))
 | 
			
		||||
    system_path = os.path.join(os.path.sep, "system", "path")
 | 
			
		||||
 | 
			
		||||
    fs.touchp(own_prefix_path)
 | 
			
		||||
    fs.touchp(new_own_prefix_path)
 | 
			
		||||
    fs.touchp(dep_prefix_path)
 | 
			
		||||
    fs.touchp(new_dep_prefix_path)
 | 
			
		||||
 | 
			
		||||
    # Old prefixes to new prefixes
 | 
			
		||||
    prefix_to_prefix = OrderedDict(
 | 
			
		||||
        [
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user