Windows: prefer Python decompression support (#36507)
On Windows, several commonly available system tools for decompression
are unreliable (gz/bz2/xz). This commit refactors `decompressor_for`
to call out to a Windows or Unix-specific method:
* The decompressor_for_nix method behaves the same as before and
  generally treats the Python/system support options for decompression
  as interchangeable (although avoids using Python's built-in tar
  support since that has had issues with permissions).
* The decompressor_for_win method can only use Python support for
  gz/bz2/xz, although for a tar.gz it does use system support for
  untar (after the decompression step). .zip uses the system tar
  utility, and .Z depends on external support (i.e. that the user
  has installed 7zip).
A naming scheme has been introduced for the various _decompression
methods:
* _system_gunzip means to use a system tool (and fail if it's not
    available)
* _py_gunzip means to use Python's built-in support for decompressing
    .gzip files (and fail if it's not available)
* _gunzip is a method that can do either
			
			
This commit is contained in:
		@@ -5,6 +5,7 @@
 | 
			
		||||
 | 
			
		||||
import os
 | 
			
		||||
import shutil
 | 
			
		||||
import sys
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
@@ -62,6 +63,7 @@ def test_native_unpacking(tmpdir_factory, archive_file):
 | 
			
		||||
        assert "TEST" in contents
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.skipif(sys.platform == "win32", reason="Only Python unpacking available on Windows")
 | 
			
		||||
@pytest.mark.parametrize("archive_file", ext_archive.keys(), indirect=True)
 | 
			
		||||
def test_system_unpacking(tmpdir_factory, archive_file, compr_support_check):
 | 
			
		||||
    # actually run test
 | 
			
		||||
 
 | 
			
		||||
@@ -14,6 +14,7 @@
 | 
			
		||||
from llnl.util import tty
 | 
			
		||||
 | 
			
		||||
import spack.util.path as spath
 | 
			
		||||
from spack.error import SpackError
 | 
			
		||||
from spack.util.executable import CommandNotFoundError, which
 | 
			
		||||
 | 
			
		||||
# Supported archive extensions.
 | 
			
		||||
@@ -68,12 +69,9 @@ def allowed_archive(path):
 | 
			
		||||
    return False if not path else any(path.endswith(t) for t in ALLOWED_ARCHIVE_TYPES)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _untar(archive_file):
 | 
			
		||||
    """Untar archive. Prefer native Python `tarfile`
 | 
			
		||||
    but fall back to system utility if there is a failure
 | 
			
		||||
    to find the native Python module (tar on Unix).
 | 
			
		||||
    Filters archives through native support gzip and xz
 | 
			
		||||
    compression formats.
 | 
			
		||||
def _system_untar(archive_file):
 | 
			
		||||
    """Returns path to unarchived tar file.
 | 
			
		||||
    Untars archive via system tar.
 | 
			
		||||
 | 
			
		||||
    Args:
 | 
			
		||||
        archive_file (str): absolute path to the archive to be extracted.
 | 
			
		||||
@@ -88,32 +86,50 @@ def _untar(archive_file):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _bunzip2(archive_file):
 | 
			
		||||
    """Use Python's bz2 module to decompress bz2 compressed archives
 | 
			
		||||
    """Returns path to decompressed file.
 | 
			
		||||
    Uses Python's bz2 module to decompress bz2 compressed archives
 | 
			
		||||
    Fall back to system utility failing to find Python module `bz2`
 | 
			
		||||
 | 
			
		||||
    Args:
 | 
			
		||||
        archive_file (str): absolute path to the bz2 archive to be decompressed
 | 
			
		||||
    """
 | 
			
		||||
    if is_bz2_supported():
 | 
			
		||||
        return _py_bunzip(archive_file)
 | 
			
		||||
    else:
 | 
			
		||||
        return _system_bunzip(archive_file)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _py_bunzip(archive_file):
 | 
			
		||||
    """Returns path to decompressed file.
 | 
			
		||||
    Decompresses bz2 compressed archives/files via python's bz2 module"""
 | 
			
		||||
    decompressed_file = os.path.basename(strip_compression_extension(archive_file, "bz2"))
 | 
			
		||||
    working_dir = os.getcwd()
 | 
			
		||||
    archive_out = os.path.join(working_dir, decompressed_file)
 | 
			
		||||
    f_bz = bz2.BZ2File(archive_file, mode="rb")
 | 
			
		||||
    with open(archive_out, "wb") as ar:
 | 
			
		||||
        shutil.copyfileobj(f_bz, ar)
 | 
			
		||||
    f_bz.close()
 | 
			
		||||
    return archive_out
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _system_bunzip(archive_file):
 | 
			
		||||
    """Returns path to decompressed file.
 | 
			
		||||
    Decompresses bz2 compressed archives/files via system bzip2 utility"""
 | 
			
		||||
    compressed_file_name = os.path.basename(archive_file)
 | 
			
		||||
    decompressed_file = os.path.basename(strip_extension(archive_file, "bz2"))
 | 
			
		||||
    decompressed_file = os.path.basename(strip_compression_extension(archive_file, "bz2"))
 | 
			
		||||
    working_dir = os.getcwd()
 | 
			
		||||
    archive_out = os.path.join(working_dir, decompressed_file)
 | 
			
		||||
    copy_path = os.path.join(working_dir, compressed_file_name)
 | 
			
		||||
    if is_bz2_supported():
 | 
			
		||||
        f_bz = bz2.BZ2File(archive_file, mode="rb")
 | 
			
		||||
        with open(archive_out, "wb") as ar:
 | 
			
		||||
            shutil.copyfileobj(f_bz, ar)
 | 
			
		||||
        f_bz.close()
 | 
			
		||||
    else:
 | 
			
		||||
        shutil.copy(archive_file, copy_path)
 | 
			
		||||
        bunzip2 = which("bunzip2", required=True)
 | 
			
		||||
        bunzip2.add_default_arg("-q")
 | 
			
		||||
        return bunzip2(copy_path)
 | 
			
		||||
    shutil.copy(archive_file, copy_path)
 | 
			
		||||
    bunzip2 = which("bunzip2", required=True)
 | 
			
		||||
    bunzip2.add_default_arg("-q")
 | 
			
		||||
    bunzip2(copy_path)
 | 
			
		||||
    return archive_out
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _gunzip(archive_file):
 | 
			
		||||
    """Decompress `.gz` extensions. Prefer native Python `gzip` module.
 | 
			
		||||
    """Returns path to gunzip'd file
 | 
			
		||||
    Decompresses `.gz` extensions. Prefer native Python `gzip` module.
 | 
			
		||||
    Failing back to system utility gunzip.
 | 
			
		||||
    Like gunzip, but extracts in the current working directory
 | 
			
		||||
    instead of in-place.
 | 
			
		||||
@@ -121,34 +137,42 @@ def _gunzip(archive_file):
 | 
			
		||||
    Args:
 | 
			
		||||
        archive_file (str): absolute path of the file to be decompressed
 | 
			
		||||
    """
 | 
			
		||||
    decompressed_file = os.path.basename(strip_extension(archive_file, "gz"))
 | 
			
		||||
    if is_gzip_supported():
 | 
			
		||||
        return _py_gunzip(archive_file)
 | 
			
		||||
    else:
 | 
			
		||||
        return _system_gunzip(archive_file)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _py_gunzip(archive_file):
 | 
			
		||||
    """Returns path to gunzip'd file
 | 
			
		||||
    Decompresses `.gz` compressed archvies via python gzip module"""
 | 
			
		||||
    decompressed_file = os.path.basename(strip_compression_extension(archive_file, "gz"))
 | 
			
		||||
    working_dir = os.getcwd()
 | 
			
		||||
    destination_abspath = os.path.join(working_dir, decompressed_file)
 | 
			
		||||
    if is_gzip_supported():
 | 
			
		||||
        f_in = gzip.open(archive_file, "rb")
 | 
			
		||||
        with open(destination_abspath, "wb") as f_out:
 | 
			
		||||
            shutil.copyfileobj(f_in, f_out)
 | 
			
		||||
        f_in.close()
 | 
			
		||||
    else:
 | 
			
		||||
        _system_gunzip(archive_file)
 | 
			
		||||
    f_in = gzip.open(archive_file, "rb")
 | 
			
		||||
    with open(destination_abspath, "wb") as f_out:
 | 
			
		||||
        shutil.copyfileobj(f_in, f_out)
 | 
			
		||||
    f_in.close()
 | 
			
		||||
    return destination_abspath
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _system_gunzip(archive_file):
 | 
			
		||||
    decompressed_file = os.path.basename(strip_extension(archive_file, "gz"))
 | 
			
		||||
    """Returns path to gunzip'd file
 | 
			
		||||
    Decompresses `.gz` compressed files via system gzip"""
 | 
			
		||||
    decompressed_file = os.path.basename(strip_compression_extension(archive_file, "gz"))
 | 
			
		||||
    working_dir = os.getcwd()
 | 
			
		||||
    destination_abspath = os.path.join(working_dir, decompressed_file)
 | 
			
		||||
    compressed_file = os.path.basename(archive_file)
 | 
			
		||||
    copy_path = os.path.join(working_dir, compressed_file)
 | 
			
		||||
    shutil.copy(archive_file, copy_path)
 | 
			
		||||
    gzip = which("gzip")
 | 
			
		||||
    gzip = which("gzip", required=True)
 | 
			
		||||
    gzip.add_default_arg("-d")
 | 
			
		||||
    gzip(copy_path)
 | 
			
		||||
    return destination_abspath
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _unzip(archive_file):
 | 
			
		||||
    """
 | 
			
		||||
    """Returns path to extracted zip archive
 | 
			
		||||
    Extract Zipfile, searching for unzip system executable
 | 
			
		||||
    If unavailable, search for 'tar' executable on system and use instead
 | 
			
		||||
 | 
			
		||||
@@ -157,7 +181,7 @@ def _unzip(archive_file):
 | 
			
		||||
    """
 | 
			
		||||
    extracted_file = os.path.basename(strip_extension(archive_file, "zip"))
 | 
			
		||||
    if sys.platform == "win32":
 | 
			
		||||
        return _untar(archive_file)
 | 
			
		||||
        return _system_untar(archive_file)
 | 
			
		||||
    else:
 | 
			
		||||
        exe = "unzip"
 | 
			
		||||
        arg = "-q"
 | 
			
		||||
@@ -167,66 +191,76 @@ def _unzip(archive_file):
 | 
			
		||||
    return extracted_file
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _unZ(archive_file):
 | 
			
		||||
def _system_unZ(archive_file):
 | 
			
		||||
    """Returns path to decompressed file
 | 
			
		||||
    Decompress UNIX compress style compression
 | 
			
		||||
    Utilizes gunzip on unix and 7zip on Windows
 | 
			
		||||
    """
 | 
			
		||||
    if sys.platform == "win32":
 | 
			
		||||
        result = _7zip(archive_file)
 | 
			
		||||
        result = _system_7zip(archive_file)
 | 
			
		||||
    else:
 | 
			
		||||
        result = _system_gunzip(archive_file)
 | 
			
		||||
    return result
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _lzma_decomp(archive_file):
 | 
			
		||||
    """Decompress lzma compressed files. Prefer Python native
 | 
			
		||||
    """Returns path to decompressed xz file.
 | 
			
		||||
    Decompress lzma compressed files. Prefer Python native
 | 
			
		||||
    lzma module, but fall back on command line xz tooling
 | 
			
		||||
    to find available Python support. This is the xz command
 | 
			
		||||
    on Unix and 7z on Windows"""
 | 
			
		||||
    to find available Python support."""
 | 
			
		||||
    if is_lzma_supported():
 | 
			
		||||
        decompressed_file = os.path.basename(strip_extension(archive_file, "xz"))
 | 
			
		||||
        archive_out = os.path.join(os.getcwd(), decompressed_file)
 | 
			
		||||
        with open(archive_out, "wb") as ar:
 | 
			
		||||
            with lzma.open(archive_file) as lar:
 | 
			
		||||
                shutil.copyfileobj(lar, ar)
 | 
			
		||||
        return _py_lzma(archive_file)
 | 
			
		||||
    else:
 | 
			
		||||
        if sys.platform == "win32":
 | 
			
		||||
            return _7zip(archive_file)
 | 
			
		||||
        else:
 | 
			
		||||
            return _xz(archive_file)
 | 
			
		||||
        return _xz(archive_file)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _win_compressed_tarball_handler(archive_file):
 | 
			
		||||
    """Decompress and extract compressed tarballs on Windows.
 | 
			
		||||
    This method uses 7zip in conjunction with the tar utility
 | 
			
		||||
    to perform decompression and extraction in a two step process
 | 
			
		||||
    first using 7zip to decompress, and tar to extract.
 | 
			
		||||
def _win_compressed_tarball_handler(decompressor):
 | 
			
		||||
    """Returns function pointer to two stage decompression
 | 
			
		||||
    and extraction method
 | 
			
		||||
    Decompress and extract compressed tarballs on Windows.
 | 
			
		||||
    This method uses a decompression method in conjunction with
 | 
			
		||||
    the tar utility to perform decompression and extraction in
 | 
			
		||||
    a two step process first using decompressor to decompress,
 | 
			
		||||
    and tar to extract.
 | 
			
		||||
 | 
			
		||||
    The motivation for this method is the inability of 7zip
 | 
			
		||||
    to directly decompress and extract compressed archives
 | 
			
		||||
    in a single shot without undocumented workarounds, and
 | 
			
		||||
    the Windows tar utility's lack of access to the xz tool (unsupported on Windows)
 | 
			
		||||
    The motivation for this method is Windows tar utility's lack
 | 
			
		||||
    of access to the xz tool (unsupported natively on Windows) but
 | 
			
		||||
    can be installed manually or via spack
 | 
			
		||||
    """
 | 
			
		||||
    # perform intermediate extraction step
 | 
			
		||||
    # record name of new archive so we can extract
 | 
			
		||||
    # and later clean up
 | 
			
		||||
    decomped_tarball = _7zip(archive_file)
 | 
			
		||||
    # 7zip is able to one shot extract compressed archives
 | 
			
		||||
    # that have been named .txz. If that is the case, there will
 | 
			
		||||
    # be no intermediate archvie to extract.
 | 
			
		||||
    if check_extension(decomped_tarball, "tar"):
 | 
			
		||||
        # run tar on newly decomped archive
 | 
			
		||||
        outfile = _untar(decomped_tarball)
 | 
			
		||||
        # clean intermediate archive to mimic end result
 | 
			
		||||
        # produced by one shot decomp/extraction
 | 
			
		||||
        os.remove(decomped_tarball)
 | 
			
		||||
        return outfile
 | 
			
		||||
    return decomped_tarball
 | 
			
		||||
 | 
			
		||||
    def unarchive(archive_file):
 | 
			
		||||
        # perform intermediate extraction step
 | 
			
		||||
        # record name of new archive so we can extract
 | 
			
		||||
        # and later clean up
 | 
			
		||||
        decomped_tarball = decompressor(archive_file)
 | 
			
		||||
        if check_extension(decomped_tarball, "tar"):
 | 
			
		||||
            # run tar on newly decomped archive
 | 
			
		||||
            outfile = _system_untar(decomped_tarball)
 | 
			
		||||
            # clean intermediate archive to mimic end result
 | 
			
		||||
            # produced by one shot decomp/extraction
 | 
			
		||||
            os.remove(decomped_tarball)
 | 
			
		||||
            return outfile
 | 
			
		||||
        return decomped_tarball
 | 
			
		||||
 | 
			
		||||
    return unarchive
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _py_lzma(archive_file):
 | 
			
		||||
    """Returns path to decompressed .xz files
 | 
			
		||||
    Decompress lzma compressed .xz files via python lzma module"""
 | 
			
		||||
    decompressed_file = os.path.basename(strip_extension(archive_file, "xz"))
 | 
			
		||||
    archive_out = os.path.join(os.getcwd(), decompressed_file)
 | 
			
		||||
    with open(archive_out, "wb") as ar:
 | 
			
		||||
        with lzma.open(archive_file) as lar:
 | 
			
		||||
            shutil.copyfileobj(lar, ar)
 | 
			
		||||
    return archive_out
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _xz(archive_file):
 | 
			
		||||
    """Decompress lzma compressed .xz files via xz command line
 | 
			
		||||
    tool. Available only on Unix
 | 
			
		||||
    """Returns path to decompressed xz files
 | 
			
		||||
    Decompress lzma compressed .xz files via xz command line
 | 
			
		||||
    tool.
 | 
			
		||||
    """
 | 
			
		||||
    if sys.platform == "win32":
 | 
			
		||||
        raise RuntimeError("XZ tool unavailable on Windows")
 | 
			
		||||
    decompressed_file = os.path.basename(strip_extension(archive_file, "xz"))
 | 
			
		||||
    working_dir = os.getcwd()
 | 
			
		||||
    destination_abspath = os.path.join(working_dir, decompressed_file)
 | 
			
		||||
@@ -239,21 +273,20 @@ def _xz(archive_file):
 | 
			
		||||
    return destination_abspath
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _7zip(archive_file):
 | 
			
		||||
    """Unpack/decompress with 7z executable
 | 
			
		||||
def _system_7zip(archive_file):
 | 
			
		||||
    """Returns path to decompressed file
 | 
			
		||||
    Unpack/decompress with 7z executable
 | 
			
		||||
    7z is able to handle a number file extensions however
 | 
			
		||||
    it may not be available on system.
 | 
			
		||||
 | 
			
		||||
    Without 7z, Windows users with certain versions of Python may
 | 
			
		||||
    be unable to extract .xz files, and all Windows users will be unable
 | 
			
		||||
    to extract .Z files. If we cannot find 7z either externally or a
 | 
			
		||||
    Spack installed copy, we fail, but inform the user that 7z can
 | 
			
		||||
    be installed via `spack install 7zip`
 | 
			
		||||
 | 
			
		||||
    Args:
 | 
			
		||||
        archive_file (str): absolute path of file to be unarchived
 | 
			
		||||
    """
 | 
			
		||||
    outfile = os.path.basename(strip_last_extension(archive_file))
 | 
			
		||||
    outfile = os.path.basename(strip_compression_extension(archive_file))
 | 
			
		||||
    _7z = which("7z")
 | 
			
		||||
    if not _7z:
 | 
			
		||||
        raise CommandNotFoundError(
 | 
			
		||||
@@ -267,12 +300,10 @@ def _7zip(archive_file):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def decompressor_for(path, extension=None):
 | 
			
		||||
    """Returns a function pointer to appropriate decompression
 | 
			
		||||
    algorithm based on extension type.
 | 
			
		||||
 | 
			
		||||
    Args:
 | 
			
		||||
        path (str): path of the archive file requiring decompression
 | 
			
		||||
    """
 | 
			
		||||
    """Returns appropriate decompression/extraction algorithm function pointer
 | 
			
		||||
    for provided extension. If extension is none, it is computed
 | 
			
		||||
    from the `path` and the decompression function is derived
 | 
			
		||||
    from that information."""
 | 
			
		||||
    if not extension:
 | 
			
		||||
        extension = extension_from_file(path, decompress=True)
 | 
			
		||||
 | 
			
		||||
@@ -282,14 +313,28 @@ def decompressor_for(path, extension=None):
 | 
			
		||||
unrecognized file extension: '%s'"
 | 
			
		||||
            % extension
 | 
			
		||||
        )
 | 
			
		||||
    if sys.platform == "win32":
 | 
			
		||||
        return decompressor_for_win(extension)
 | 
			
		||||
    else:
 | 
			
		||||
        return decompressor_for_nix(extension)
 | 
			
		||||
 | 
			
		||||
    if re.match(r"\.?zip$", extension) or path.endswith(".zip"):
 | 
			
		||||
 | 
			
		||||
def decompressor_for_nix(extension):
 | 
			
		||||
    """Returns a function pointer to appropriate decompression
 | 
			
		||||
    algorithm based on extension type and unix specific considerations
 | 
			
		||||
    i.e. a reasonable expectation system utils like gzip, bzip2, and xz are
 | 
			
		||||
    available
 | 
			
		||||
 | 
			
		||||
    Args:
 | 
			
		||||
        path (str): path of the archive file requiring decompression
 | 
			
		||||
    """
 | 
			
		||||
    if re.match(r"zip$", extension):
 | 
			
		||||
        return _unzip
 | 
			
		||||
 | 
			
		||||
    if re.match(r"gz", extension):
 | 
			
		||||
    if re.match(r"gz$", extension):
 | 
			
		||||
        return _gunzip
 | 
			
		||||
 | 
			
		||||
    if re.match(r"bz2", extension):
 | 
			
		||||
    if re.match(r"bz2$", extension):
 | 
			
		||||
        return _bunzip2
 | 
			
		||||
 | 
			
		||||
    # Python does not have native support
 | 
			
		||||
@@ -297,21 +342,80 @@ def decompressor_for(path, extension=None):
 | 
			
		||||
    # we rely on external tools such as tar,
 | 
			
		||||
    # 7z, or uncompressZ
 | 
			
		||||
    if re.match(r"Z$", extension):
 | 
			
		||||
        return _unZ
 | 
			
		||||
        return _system_unZ
 | 
			
		||||
 | 
			
		||||
    # Python and platform may not have support for lzma
 | 
			
		||||
    # compression. If no lzma support, use tools available on systems
 | 
			
		||||
    # 7zip on Windows and the xz tool on Unix systems.
 | 
			
		||||
    if re.match(r"xz", extension):
 | 
			
		||||
    if re.match(r"xz$", extension):
 | 
			
		||||
        return _lzma_decomp
 | 
			
		||||
 | 
			
		||||
    # Catch tar.xz/tar.Z files here for Windows
 | 
			
		||||
    # as the tar utility on Windows cannot handle such
 | 
			
		||||
    # compression types directly
 | 
			
		||||
    if ("xz" in extension or "Z" in extension) and sys.platform == "win32":
 | 
			
		||||
        return _win_compressed_tarball_handler
 | 
			
		||||
    return _system_untar
 | 
			
		||||
 | 
			
		||||
    return _untar
 | 
			
		||||
 | 
			
		||||
def _determine_py_decomp_archive_strategy(extension):
 | 
			
		||||
    """Returns appropriate python based decompression strategy
 | 
			
		||||
    based on extension type"""
 | 
			
		||||
    # Only rely on Python decompression support for gz
 | 
			
		||||
    if re.match(r"gz$", extension):
 | 
			
		||||
        return _py_gunzip
 | 
			
		||||
 | 
			
		||||
    # Only rely on Python decompression support for bzip2
 | 
			
		||||
    if re.match(r"bz2$", extension):
 | 
			
		||||
        return _py_bunzip
 | 
			
		||||
 | 
			
		||||
    # Only rely on Python decompression support for xz
 | 
			
		||||
    if re.match(r"xz$", extension):
 | 
			
		||||
        return _py_lzma
 | 
			
		||||
 | 
			
		||||
    return None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def decompressor_for_win(extension):
 | 
			
		||||
    """Returns a function pointer to appropriate decompression
 | 
			
		||||
    algorithm based on extension type and Windows specific considerations
 | 
			
		||||
 | 
			
		||||
    Windows natively vendors *only* tar, no other archive/compression utilities
 | 
			
		||||
    So we must rely exclusively on Python module support for all compression
 | 
			
		||||
    operations, tar for tarballs and zip files, and 7zip for Z compressed archives
 | 
			
		||||
    and files as Python does not provide support for the UNIX compress algorithm
 | 
			
		||||
 | 
			
		||||
    Args:
 | 
			
		||||
        path (str): path of the archive file requiring decompression
 | 
			
		||||
        extension (str): extension
 | 
			
		||||
    """
 | 
			
		||||
    extension = expand_contracted_extension(extension)
 | 
			
		||||
    # Windows native tar can handle .zip extensions, use standard
 | 
			
		||||
    # unzip method
 | 
			
		||||
    if re.match(r"zip$", extension):
 | 
			
		||||
        return _unzip
 | 
			
		||||
 | 
			
		||||
    # if extension is standard tarball, invoke Windows native tar
 | 
			
		||||
    if re.match(r"tar$", extension):
 | 
			
		||||
        return _system_untar
 | 
			
		||||
 | 
			
		||||
    # Python does not have native support
 | 
			
		||||
    # of any kind for .Z files. In these cases,
 | 
			
		||||
    # we rely on 7zip, which must be installed outside
 | 
			
		||||
    # of spack and added to the PATH or externally detected
 | 
			
		||||
    if re.match(r"Z$", extension):
 | 
			
		||||
        return _system_unZ
 | 
			
		||||
 | 
			
		||||
    # Windows vendors no native decompression tools, attempt to derive
 | 
			
		||||
    # python based decompression strategy
 | 
			
		||||
    # Expand extension from contracted extension i.e. tar.gz from .tgz
 | 
			
		||||
    # no-op on non contracted extensions
 | 
			
		||||
    compression_extension = compression_ext_from_compressed_archive(extension)
 | 
			
		||||
    decompressor = _determine_py_decomp_archive_strategy(compression_extension)
 | 
			
		||||
    if not decompressor:
 | 
			
		||||
        raise SpackError(
 | 
			
		||||
            "Spack was unable to determine a proper decompression strategy for"
 | 
			
		||||
            f"valid extension: {extension}"
 | 
			
		||||
            "This is a bug, please file an issue at https://github.com/spack/spack/issues"
 | 
			
		||||
        )
 | 
			
		||||
    if "tar" not in extension:
 | 
			
		||||
        return decompressor
 | 
			
		||||
 | 
			
		||||
    return _win_compressed_tarball_handler(decompressor)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class FileTypeInterface:
 | 
			
		||||
@@ -589,7 +693,7 @@ def extension_from_file(file, decompress=False):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def extension_from_path(path):
 | 
			
		||||
    """Get the allowed archive extension for a path.
 | 
			
		||||
    """Returns the allowed archive extension for a path.
 | 
			
		||||
    If path does not include a valid archive extension
 | 
			
		||||
    (see`spack.util.compression.ALLOWED_ARCHIVE_TYPES`) return None
 | 
			
		||||
    """
 | 
			
		||||
@@ -602,19 +706,23 @@ def extension_from_path(path):
 | 
			
		||||
    return None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def strip_last_extension(path):
 | 
			
		||||
    """Strips last supported archive extension from path"""
 | 
			
		||||
    if path:
 | 
			
		||||
        for ext in ALLOWED_SINGLE_EXT_ARCHIVE_TYPES:
 | 
			
		||||
            mod_path = check_and_remove_ext(path, ext)
 | 
			
		||||
            if mod_path != path:
 | 
			
		||||
                return mod_path
 | 
			
		||||
def strip_compression_extension(path, ext=None):
 | 
			
		||||
    """Returns path with last supported or provided archive extension stripped"""
 | 
			
		||||
    path = expand_contracted_extension_in_path(path)
 | 
			
		||||
    exts_to_check = EXTS
 | 
			
		||||
    if ext:
 | 
			
		||||
        exts_to_check = [ext]
 | 
			
		||||
    for ext_check in exts_to_check:
 | 
			
		||||
        mod_path = check_and_remove_ext(path, ext_check)
 | 
			
		||||
        if mod_path != path:
 | 
			
		||||
            return mod_path
 | 
			
		||||
    return path
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def strip_extension(path, ext=None):
 | 
			
		||||
    """Get the part of a path that does not include its compressed
 | 
			
		||||
    type extension."""
 | 
			
		||||
    """Returns the part of a path that does not include extension.
 | 
			
		||||
    If ext is given, only attempts to remove that extension. If no
 | 
			
		||||
    extension given, attempts to strip any valid extension from path"""
 | 
			
		||||
    if ext:
 | 
			
		||||
        return check_and_remove_ext(path, ext)
 | 
			
		||||
    for t in ALLOWED_ARCHIVE_TYPES:
 | 
			
		||||
@@ -625,7 +733,8 @@ def strip_extension(path, ext=None):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def check_extension(path, ext):
 | 
			
		||||
    """Check if extension is present in path"""
 | 
			
		||||
    """Returns true if extension is present in path
 | 
			
		||||
    false otherwise"""
 | 
			
		||||
    # Strip sourceforge suffix.
 | 
			
		||||
    prefix, _ = spath.find_sourceforge_suffix(path)
 | 
			
		||||
    if not ext.startswith(r"\."):
 | 
			
		||||
@@ -636,7 +745,7 @@ def check_extension(path, ext):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def reg_remove_ext(path, ext):
 | 
			
		||||
    """Regex remove ext from path"""
 | 
			
		||||
    """Returns path with ext remove via regex"""
 | 
			
		||||
    if path and ext:
 | 
			
		||||
        suffix = r"\.%s$" % ext
 | 
			
		||||
        return re.sub(suffix, "", path)
 | 
			
		||||
@@ -644,8 +753,41 @@ def reg_remove_ext(path, ext):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def check_and_remove_ext(path, ext):
 | 
			
		||||
    """If given extension is present in path, remove and return,
 | 
			
		||||
    otherwise just return path"""
 | 
			
		||||
    """Returns path with extension removed if extension
 | 
			
		||||
    is present in path. Otherwise just returns path"""
 | 
			
		||||
    if check_extension(path, ext):
 | 
			
		||||
        return reg_remove_ext(path, ext)
 | 
			
		||||
    return path
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _substitute_extension(path, old_ext, new_ext):
 | 
			
		||||
    """Returns path with old_ext replaced with new_ext.
 | 
			
		||||
    old_ext and new_ext can be extension strings or regexs"""
 | 
			
		||||
    return re.sub(rf"{old_ext}", rf"{new_ext}", path)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def expand_contracted_extension_in_path(path, ext=None):
 | 
			
		||||
    """Returns path with any contraction extension (i.e. tgz) expanded
 | 
			
		||||
    (i.e. tar.gz). If ext is specified, only attempt to expand that extension"""
 | 
			
		||||
    if not ext:
 | 
			
		||||
        ext = extension_from_path(path)
 | 
			
		||||
    expanded_ext = expand_contracted_extension(ext)
 | 
			
		||||
    if expanded_ext != ext:
 | 
			
		||||
        return _substitute_extension(path, ext, expanded_ext)
 | 
			
		||||
    return path
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def expand_contracted_extension(extension):
 | 
			
		||||
    """Return expanded version of contracted extension
 | 
			
		||||
    i.e. .tgz -> .tar.gz, no op on non contracted extensions"""
 | 
			
		||||
    extension = extension.strip(".")
 | 
			
		||||
    contraction_map = {"tgz": "tar.gz", "txz": "tar.xz", "tbz": "tar.bz2", "tbz2": "tar.bz2"}
 | 
			
		||||
    return contraction_map.get(extension, extension)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def compression_ext_from_compressed_archive(extension):
 | 
			
		||||
    """Returns compression extension for a compressed archive"""
 | 
			
		||||
    extension = expand_contracted_extension(extension)
 | 
			
		||||
    for ext in [*EXTS]:
 | 
			
		||||
        if ext in extension:
 | 
			
		||||
            return ext
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user