Fetching/decompressing: use magic numbers (#31589)

Spack currently depends on parsing filenames of downloaded files to
determine what type of archive they are and how to decompress them.
This commit adds a preliminary check based on magic numbers to
determine archive type (but falls back on name parsing if the
extension type cannot be determined).

As part of this work, this commit also enables decompression of
.tar.xz-compressed archives on Windows.
This commit is contained in:
John W. Parent 2022-09-26 03:01:42 -04:00 committed by GitHub
parent a5ea566bdf
commit 30f6fd8dc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 525 additions and 138 deletions

View File

@ -24,7 +24,7 @@
from llnl.util.lang import dedupe, memoized
from llnl.util.symlink import islink, symlink
from spack.util.executable import Executable
from spack.util.executable import CommandNotFoundError, Executable, which
from spack.util.path import path_to_os_path, system_path_filter
is_windows = _platform == "win32"
@ -113,6 +113,69 @@ def path_contains_subdirectory(path, root):
return norm_path.startswith(norm_root)
@memoized
def file_command(*args):
"""Creates entry point to `file` system command with provided arguments"""
try:
file_cmd = which("file", required=True)
except CommandNotFoundError as e:
if is_windows:
raise CommandNotFoundError("`file` utility is not available on Windows")
else:
raise e
for arg in args:
file_cmd.add_default_arg(arg)
return file_cmd
@memoized
def _get_mime_type():
"""Generate method to call `file` system command to aquire mime type
for a specified path
"""
return file_command("-b", "-h", "--mime-type")
@memoized
def _get_mime_type_compressed():
"""Same as _get_mime_type but attempts to check for
compression first
"""
mime_uncompressed = _get_mime_type()
mime_uncompressed.add_default_arg("-Z")
return mime_uncompressed
def mime_type(filename):
"""Returns the mime type and subtype of a file.
Args:
filename: file to be analyzed
Returns:
Tuple containing the MIME type and subtype
"""
output = _get_mime_type()(filename, output=str, error=str).strip()
tty.debug("==> " + output)
type, _, subtype = output.partition("/")
return type, subtype
def compressed_mime_type(filename):
"""Same as mime_type but checks for type that has been compressed
Args:
filename (str): file to be analyzed
Returns:
Tuple containing the MIME type and subtype
"""
output = _get_mime_type_compressed()(filename, output=str, error=str).strip()
tty.debug("==> " + output)
type, _, subtype = output.partition("/")
return type, subtype
#: This generates the library filenames that may appear on any OS.
library_extensions = ["a", "la", "so", "tbd", "dylib"]

View File

@ -19,6 +19,7 @@
import ruamel.yaml as yaml
from six.moves.urllib.error import HTTPError, URLError
import llnl.util.filesystem as fsys
import llnl.util.lang
import llnl.util.tty as tty
from llnl.util.filesystem import mkdirp
@ -653,7 +654,7 @@ def get_buildfile_manifest(spec):
for filename in files:
path_name = os.path.join(root, filename)
m_type, m_subtype = relocate.mime_type(path_name)
m_type, m_subtype = fsys.mime_type(path_name)
rel_path_name = os.path.relpath(path_name, spec.prefix)
added = False

View File

@ -54,7 +54,7 @@
import spack.util.url as url_util
import spack.util.web as web_util
import spack.version
from spack.util.compression import decompressor_for, extension
from spack.util.compression import decompressor_for, extension_from_path
from spack.util.executable import CommandNotFoundError, which
from spack.util.string import comma_and, quote
@ -613,7 +613,7 @@ def expand(self):
@_needs_stage
def archive(self, destination, **kwargs):
assert extension(destination) == "tar.gz"
assert extension_from_path(destination) == "tar.gz"
assert self.stage.source_path.startswith(self.stage.path)
tar = which("tar", required=True)

View File

@ -11,6 +11,7 @@
import macholib.mach_o
import macholib.MachO
import llnl.util.filesystem as fs
import llnl.util.lang
import llnl.util.tty as tty
from llnl.util.lang import memoized
@ -887,7 +888,7 @@ def file_is_relocatable(filename, paths_to_relocate=None):
# Remove the RPATHS from the strings in the executable
set_of_strings = set(strings(filename, output=str).split())
m_type, m_subtype = mime_type(filename)
m_type, m_subtype = fs.mime_type(filename)
if m_type == "application":
tty.debug("{0},{1}".format(m_type, m_subtype), level=2)
@ -923,7 +924,7 @@ def is_binary(filename):
Returns:
True or False
"""
m_type, _ = mime_type(filename)
m_type, _ = fs.mime_type(filename)
msg = "[{0}] -> ".format(filename)
if m_type == "application":
@ -934,30 +935,6 @@ def is_binary(filename):
return False
@llnl.util.lang.memoized
def _get_mime_type():
file_cmd = executable.which("file")
for arg in ["-b", "-h", "--mime-type"]:
file_cmd.add_default_arg(arg)
return file_cmd
@llnl.util.lang.memoized
def mime_type(filename):
"""Returns the mime type and subtype of a file.
Args:
filename: file to be analyzed
Returns:
Tuple containing the MIME type and subtype
"""
output = _get_mime_type()(filename, output=str, error=str).strip()
tty.debug("==> " + output, level=2)
type, _, subtype = output.partition("/")
return type, subtype
# Memoize this due to repeated calls to libraries in the same directory.
@llnl.util.lang.memoized
def _exists_dir(dirname):
@ -975,7 +952,7 @@ def fixup_macos_rpath(root, filename):
True if fixups were applied, else False
"""
abspath = os.path.join(root, filename)
if mime_type(abspath) != ("application", "x-mach-binary"):
if fs.mime_type(abspath) != ("application", "x-mach-binary"):
return False
# Get Mach-O header commands

View File

@ -22,6 +22,9 @@
for ext in scomp.ALLOWED_ARCHIVE_TYPES
if "TAR" not in ext
]
# Spack does not use Python native handling for tarballs or zip
# Don't test tarballs or zip in native test
native_archive_list = [key for key in ext_archive.keys() if "tar" not in key and "zip" not in key]
def support_stub():
@ -30,10 +33,9 @@ def support_stub():
@pytest.fixture
def compr_support_check(monkeypatch):
monkeypatch.setattr(scomp, "lzma_support", support_stub)
monkeypatch.setattr(scomp, "tar_support", support_stub)
monkeypatch.setattr(scomp, "gzip_support", support_stub)
monkeypatch.setattr(scomp, "bz2_support", support_stub)
monkeypatch.setattr(scomp, "is_lzma_supported", support_stub)
monkeypatch.setattr(scomp, "is_gzip_supported", support_stub)
monkeypatch.setattr(scomp, "is_bz2_supported", support_stub)
@pytest.fixture
@ -46,10 +48,9 @@ def archive_file(tmpdir_factory, request):
return os.path.join(str(tmpdir), "Foo.%s" % extension)
@pytest.mark.parametrize("archive_file", ext_archive.keys(), indirect=True)
@pytest.mark.parametrize("archive_file", native_archive_list, indirect=True)
def test_native_unpacking(tmpdir_factory, archive_file):
extension = scomp.extension(archive_file)
util = scomp.decompressor_for(archive_file, extension)
util = scomp.decompressor_for(archive_file)
tmpdir = tmpdir_factory.mktemp("comp_test")
with working_dir(str(tmpdir)):
assert not os.listdir(os.getcwd())
@ -63,9 +64,8 @@ def test_native_unpacking(tmpdir_factory, archive_file):
@pytest.mark.parametrize("archive_file", ext_archive.keys(), indirect=True)
def test_system_unpacking(tmpdir_factory, archive_file, compr_support_check):
extension = scomp.extension(archive_file)
# actually run test
util = scomp.decompressor_for(archive_file, extension)
util = scomp.decompressor_for(archive_file)
tmpdir = tmpdir_factory.mktemp("system_comp_test")
with working_dir(str(tmpdir)):
assert not os.listdir(os.getcwd())
@ -78,23 +78,25 @@ def test_system_unpacking(tmpdir_factory, archive_file, compr_support_check):
def test_unallowed_extension():
bad_ext_archive = "Foo.py"
# use a cxx file as python files included for the test
# are picked up by the linter and break style checks
bad_ext_archive = "Foo.cxx"
with pytest.raises(CommandNotFoundError):
scomp.decompressor_for(bad_ext_archive, "py")
scomp.decompressor_for(bad_ext_archive)
@pytest.mark.parametrize("archive", ext_archive.values())
def test_get_extension(archive):
ext = scomp.extension(archive)
ext = scomp.extension_from_path(archive)
assert ext_archive[ext] == archive
def test_get_bad_extension():
archive = "Foo.py"
ext = scomp.extension(archive)
archive = "Foo.cxx"
ext = scomp.extension_from_path(archive)
assert ext is None
@pytest.mark.parametrize("path", ext_archive.values())
def test_allowed_archvie(path):
def test_allowed_archive(path):
assert scomp.allowed_archive(path)

View File

@ -36,6 +36,7 @@
import spack.error
import spack.util.compression as comp
import spack.util.path as spath
import spack.version
@ -366,17 +367,15 @@ def split_url_extension(path):
# Strip off sourceforge download suffix.
# e.g. https://sourceforge.net/projects/glew/files/glew/2.0.0/glew-2.0.0.tgz/download
match = re.search(r"(.*(?:sourceforge\.net|sf\.net)/.*)(/download)$", path)
if match:
prefix, suffix = match.groups()
prefix, suffix = spath.find_sourceforge_suffix(path)
ext = comp.extension(prefix)
ext = comp.extension_from_path(prefix)
if ext is not None:
prefix = comp.strip_extension(prefix)
else:
prefix, suf = strip_query_and_fragment(prefix)
ext = comp.extension(prefix)
ext = comp.extension_from_path(prefix)
prefix = comp.strip_extension(prefix)
suffix = suf + suffix
if ext is None:

View File

@ -3,61 +3,67 @@
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import inspect
import io
import os
import re
import shutil
import sys
from itertools import product
from llnl.util import tty
import spack.util.path as spath
from spack.util.executable import CommandNotFoundError, which
# Supported archive extensions.
PRE_EXTS = ["tar", "TAR"]
EXTS = ["gz", "bz2", "xz", "Z"]
NOTAR_EXTS = ["zip", "tgz", "tbz", "tbz2", "txz"]
NOTAR_EXTS = ["zip", "tgz", "tbz2", "tbz", "txz"]
# Add PRE_EXTS and EXTS last so that .tar.gz is matched *before* .tar or .gz
ALLOWED_ARCHIVE_TYPES = (
[".".join(ext) for ext in product(PRE_EXTS, EXTS)] + PRE_EXTS + EXTS + NOTAR_EXTS
)
ALLOWED_SINGLE_EXT_ARCHIVE_TYPES = PRE_EXTS + EXTS + NOTAR_EXTS
is_windows = sys.platform == "win32"
try:
import bz2 # noqa
def bz2_support():
try:
import bz2 # noqa: F401
return True
except ImportError:
return False
_bz2_support = True
except ImportError:
_bz2_support = False
def gzip_support():
try:
import gzip # noqa: F401
try:
import gzip # noqa
return True
except ImportError:
return False
_gzip_support = True
except ImportError:
_gzip_support = False
def lzma_support():
try:
import lzma # noqa: F401 # novm
try:
import lzma # noqa # novermin
return True
except ImportError:
return False
_lzma_support = True
except ImportError:
_lzma_support = False
def tar_support():
try:
import tarfile # noqa: F401
def is_lzma_supported():
return _lzma_support
return True
except ImportError:
return False
def is_gzip_supported():
return _gzip_support
def is_bz2_supported():
return _bz2_support
def allowed_archive(path):
@ -75,8 +81,7 @@ def _untar(archive_file):
archive_file (str): absolute path to the archive to be extracted.
Can be one of .tar(.[gz|bz2|xz|Z]) or .(tgz|tbz|tbz2|txz).
"""
_, ext = os.path.splitext(archive_file)
outfile = os.path.basename(archive_file.strip(ext))
outfile = os.path.basename(strip_extension(archive_file, "tar"))
tar = which("tar", required=True)
tar.add_default_arg("-oxf")
@ -91,15 +96,12 @@ def _bunzip2(archive_file):
Args:
archive_file (str): absolute path to the bz2 archive to be decompressed
"""
_, ext = os.path.splitext(archive_file)
compressed_file_name = os.path.basename(archive_file)
decompressed_file = os.path.basename(archive_file.strip(ext))
decompressed_file = os.path.basename(strip_extension(archive_file, "bz2"))
working_dir = os.getcwd()
archive_out = os.path.join(working_dir, decompressed_file)
copy_path = os.path.join(working_dir, compressed_file_name)
if bz2_support():
import bz2
if is_bz2_supported():
f_bz = bz2.BZ2File(archive_file, mode="rb")
with open(archive_out, "wb") as ar:
shutil.copyfileobj(f_bz, ar)
@ -121,13 +123,10 @@ def _gunzip(archive_file):
Args:
archive_file (str): absolute path of the file to be decompressed
"""
_, ext = os.path.splitext(archive_file)
decompressed_file = os.path.basename(archive_file.strip(ext))
decompressed_file = os.path.basename(strip_extension(archive_file, "gz"))
working_dir = os.getcwd()
destination_abspath = os.path.join(working_dir, decompressed_file)
if gzip_support():
import gzip
if is_gzip_supported():
f_in = gzip.open(archive_file, "rb")
with open(destination_abspath, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
@ -138,8 +137,7 @@ def _gunzip(archive_file):
def _system_gunzip(archive_file):
_, ext = os.path.splitext(archive_file)
decompressed_file = os.path.basename(archive_file.strip(ext))
decompressed_file = os.path.basename(strip_extension(archive_file, "gz"))
working_dir = os.getcwd()
destination_abspath = os.path.join(working_dir, decompressed_file)
compressed_file = os.path.basename(archive_file)
@ -159,17 +157,16 @@ def _unzip(archive_file):
Args:
archive_file (str): absolute path of the file to be decompressed
"""
destination_abspath = os.getcwd()
exe = "unzip"
arg = "-q"
extracted_file = os.path.basename(strip_extension(archive_file, "zip"))
if is_windows:
exe = "tar"
arg = "-xf"
unzip = which(exe, required=True)
unzip.add_default_arg(arg)
unzip(archive_file)
return destination_abspath
return _untar(archive_file)
else:
exe = "unzip"
arg = "-q"
unzip = which(exe, required=True)
unzip.add_default_arg(arg)
unzip(archive_file)
return extracted_file
def _unZ(archive_file):
@ -185,11 +182,8 @@ def _lzma_decomp(archive_file):
lzma module, but fall back on command line xz tooling
to find available Python support. This is the xz command
on Unix and 7z on Windows"""
if lzma_support():
import lzma # novermin
_, ext = os.path.splitext(archive_file)
decompressed_file = os.path.basename(archive_file.strip(ext))
if is_lzma_supported():
decompressed_file = os.path.basename(strip_extension(archive_file, "xz"))
archive_out = os.path.join(os.getcwd(), decompressed_file)
with open(archive_out, "wb") as ar:
with lzma.open(archive_file) as lar:
@ -201,14 +195,41 @@ def _lzma_decomp(archive_file):
return _xz(archive_file)
def _win_compressed_tarball_handler(archive_file):
"""Decompress and extract compressed tarballs on Windows.
This method uses 7zip in conjunction with the tar utility
to perform decompression and extraction in a two step process
first using 7zip to decompress, and tar to extract.
The motivation for this method is the inability of 7zip
to directly decompress and extract compressed archives
in a single shot without undocumented workarounds, and
the Windows tar utility's lack of access to the xz tool (unsupported on Windows)
"""
# perform intermediate extraction step
# record name of new archive so we can extract
# and later clean up
decomped_tarball = _7zip(archive_file)
# 7zip is able to one shot extract compressed archives
# that have been named .txz. If that is the case, there will
# be no intermediate archvie to extract.
if check_extension(decomped_tarball, "tar"):
# run tar on newly decomped archive
outfile = _untar(decomped_tarball)
# clean intermediate archive to mimic end result
# produced by one shot decomp/extraction
os.remove(decomped_tarball)
return outfile
return decomped_tarball
def _xz(archive_file):
"""Decompress lzma compressed .xz files via xz command line
tool. Available only on Unix
"""
if is_windows:
raise RuntimeError("XZ tool unavailable on Windows")
_, ext = os.path.splitext(archive_file)
decompressed_file = os.path.basename(archive_file.strip(ext))
decompressed_file = os.path.basename(strip_extension(archive_file, "xz"))
working_dir = os.getcwd()
destination_abspath = os.path.join(working_dir, decompressed_file)
compressed_file = os.path.basename(archive_file)
@ -234,84 +255,399 @@ def _7zip(archive_file):
Args:
archive_file (str): absolute path of file to be unarchived
"""
_, ext = os.path.splitext(archive_file)
outfile = os.path.basename(archive_file.strip(ext))
outfile = os.path.basename(strip_last_extension(archive_file))
_7z = which("7z")
if not _7z:
raise CommandNotFoundError(
"7z unavailable,\
unable to extract %s files. 7z can be installed via Spack"
% ext
% extension_from_path(archive_file)
)
_7z.add_default_arg("e")
_7z(archive_file)
return outfile
def decompressor_for(path, ext):
def decompressor_for(path, extension=None):
"""Returns a function pointer to appropriate decompression
algorithm based on extension type.
Args:
path (str): path of the archive file requiring decompression
ext (str): Extension of archive file
"""
if not allowed_archive(ext):
if not extension:
extension = extension_from_file(path, decompress=True)
if not allowed_archive(extension):
raise CommandNotFoundError(
"Cannot extract archive, \
unrecognized file extension: '%s'"
% ext
% extension
)
if re.match(r"\.?zip$", ext) or path.endswith(".zip"):
if re.match(r"\.?zip$", extension) or path.endswith(".zip"):
return _unzip
if re.match(r"gz", ext):
if re.match(r"gz", extension):
return _gunzip
if re.match(r"bz2", ext):
if re.match(r"bz2", extension):
return _bunzip2
# Python does not have native support
# of any kind for .Z files. In these cases,
# we rely on external tools such as tar,
# 7z, or uncompressZ
if re.match(r"Z$", ext):
if re.match(r"Z$", extension):
return _unZ
# Python and platform may not have support for lzma
# compression. If no lzma support, use tools available on systems
# 7zip on Windows and the xz tool on Unix systems.
if re.match(r"xz", ext):
if re.match(r"xz", extension):
return _lzma_decomp
if ("xz" in ext or "Z" in ext) and is_windows:
return _7zip
# Catch tar.xz/tar.Z files here for Windows
# as the tar utility on Windows cannot handle such
# compression types directly
if ("xz" in extension or "Z" in extension) and is_windows:
return _win_compressed_tarball_handler
return _untar
def strip_extension(path):
"""Get the part of a path that does not include its compressed
type extension."""
for type in ALLOWED_ARCHIVE_TYPES:
suffix = r"\.%s$" % type
if re.search(suffix, path):
return re.sub(suffix, "", path)
return path
class FileTypeInterface:
"""
Base interface class for describing and querying file type information.
FileType describes information about a single file type
such as extension, and byte header properties, and provides an interface
to check a given file against said type based on magic number.
This class should be subclassed each time a new type is to be
described.
Note: This class should not be used directly as it does not define any specific
file. Attempts to directly use this class will fail, as it does not define
a magic number or extension string.
Subclasses should each describe a different
type of file. In order to do so, they must define
the extension string, magic number, and header offset (if non zero).
If a class has multiple magic numbers, it will need to
override the method describin that file types magic numbers and
the method that checks a types magic numbers against a given file's.
"""
OFFSET = 0
compressed = False
@staticmethod
def name():
raise NotImplementedError
@classmethod
def magic_number(cls):
"""Return a list of all potential magic numbers for a filetype"""
return [x[1] for x in inspect.getmembers(cls) if x[0].startswith("_MAGIC_NUMBER")]
@classmethod
def header_size(cls):
"""Return size of largest magic number associated with file type"""
return max([len(x) for x in cls.magic_number()])
@classmethod
def _bytes_check(cls, magic_bytes):
for magic in cls.magic_number():
if magic_bytes.startswith(magic):
return True
return False
@classmethod
def is_file_of_type(cls, iostream):
"""Query byte stream for appropriate magic number
Args:
iostream: file byte stream
Returns:
Bool denoting whether file is of class file type
based on magic number
"""
if not iostream:
return False
# move to location of magic bytes
iostream.seek(cls.OFFSET)
magic_bytes = iostream.read(cls.header_size())
# return to beginning of file
iostream.seek(0)
if cls._bytes_check(magic_bytes):
return True
return False
def extension(path):
"""Get the archive extension for a path."""
class CompressedFileTypeInterface(FileTypeInterface):
"""Interface class for FileTypes that include compression information"""
compressed = True
@staticmethod
def decomp_in_memory(stream):
"""This method decompresses and loads the first 200 or so bytes of a compressed file
to check for compressed archives. This does not decompress the entire file and should
not be used for direct expansion of archives/compressed files
"""
raise NotImplementedError("Implementation by compression subclass required")
class BZipFileType(CompressedFileTypeInterface):
_MAGIC_NUMBER = b"\x42\x5a\x68"
extension = "bz2"
@staticmethod
def name():
return "bzip2 compressed data"
@staticmethod
def decomp_in_memory(stream):
if is_bz2_supported():
# checking for underlying archive, only decomp as many bytes
# as is absolutely neccesary for largest archive header (tar)
comp_stream = stream.read(TarFileType.OFFSET + TarFileType.header_size())
return io.BytesIO(initial_bytes=bz2.BZ2Decompressor().decompress(comp_stream))
return None
class ZCompressedFileType(CompressedFileTypeInterface):
_MAGIC_NUMBER_LZW = b"\x1f\x9d"
_MAGIC_NUMBER_LZH = b"\x1f\xa0"
extension = "Z"
@staticmethod
def name():
return "compress'd data"
@staticmethod
def decomp_in_memory(stream):
# python has no method of decompressing `.Z` files in memory
return None
class GZipFileType(CompressedFileTypeInterface):
_MAGIC_NUMBER = b"\x1f\x8b\x08"
extension = "gz"
@staticmethod
def name():
return "gzip compressed data"
@staticmethod
def decomp_in_memory(stream):
if is_gzip_supported():
# checking for underlying archive, only decomp as many bytes
# as is absolutely neccesary for largest archive header (tar)
return io.BytesIO(
initial_bytes=gzip.GzipFile(fileobj=stream).read(
TarFileType.OFFSET + TarFileType.header_size()
)
)
return None
class LzmaFileType(CompressedFileTypeInterface):
_MAGIC_NUMBER = b"\xfd7zXZ"
extension = "xz"
@staticmethod
def name():
return "xz compressed data"
@staticmethod
def decomp_in_memory(stream):
if is_lzma_supported():
# checking for underlying archive, only decomp as many bytes
# as is absolutely neccesary for largest archive header (tar)
max_size = TarFileType.OFFSET + TarFileType.header_size()
return io.BytesIO(
initial_bytes=lzma.LZMADecompressor().decompress(
stream.read(max_size), max_length=max_size
)
)
return None
class TarFileType(FileTypeInterface):
OFFSET = 257
_MAGIC_NUMBER_GNU = b"ustar \0"
_MAGIC_NUMBER_POSIX = b"ustar\x0000"
extension = "tar"
@staticmethod
def name():
return "tar archive"
class ZipFleType(FileTypeInterface):
_MAGIC_NUMBER = b"PK\003\004"
extension = "zip"
@staticmethod
def name():
return "Zip archive data"
# collection of valid Spack recognized archive and compression
# file type identifier classes.
VALID_FILETYPES = [
BZipFileType,
ZCompressedFileType,
GZipFileType,
LzmaFileType,
TarFileType,
ZipFleType,
]
def extension_from_stream(stream, decompress=False):
"""Return extension represented by stream corresponding to archive file
If stream does not represent an archive type recongized by Spack
(see `spack.util.compression.ALLOWED_ARCHIVE_TYPES`) method will return None
Extension type is derived by searching for identifying bytes
in file stream.
Args:
stream : stream representing a file on system
decompress (bool) : if True, compressed files are checked
for archive types beneath compression i.e. tar.gz
default is False, otherwise, return top level type i.e. gz
Return:
A string represting corresponding archive extension
or None as relevant.
"""
for arc_type in VALID_FILETYPES:
if arc_type.is_file_of_type(stream):
suffix_ext = arc_type.extension
prefix_ext = ""
if arc_type.compressed and decompress:
# stream represents compressed file
# get decompressed stream (if possible)
decomp_stream = arc_type.decomp_in_memory(stream)
prefix_ext = extension_from_stream(decomp_stream, decompress=decompress)
if not prefix_ext:
# We were unable to decompress or unable to derive
# a nested extension from decompressed file.
# Try to use filename parsing to check for
# potential nested extensions if there are any
tty.debug(
"Cannot derive file extension from magic number;"
" falling back to regex path parsing."
)
return extension_from_path(stream.name)
resultant_ext = suffix_ext if not prefix_ext else ".".join([prefix_ext, suffix_ext])
tty.debug("File extension %s successfully derived by magic number." % resultant_ext)
return resultant_ext
return None
def extension_from_file(file, decompress=False):
"""Return extension from archive file path
Extension is derived based on magic number parsing similar
to the `file` utility. Attempts to return abbreviated file extensions
whenever a file has an abbreviated extension such as `.tgz` or `.txz`.
This distinction in abbreivated extension names is accomplished
by string parsing.
Args:
file (os.PathLike): path descibing file on system for which ext
will be determined.
decompress (bool): If True, method will peek into compressed
files to check for archive file types. default is False.
If false, method will be unable to distinguish `.tar.gz` from `.gz`
or similar.
Return:
Spack recognized archive file extension as determined by file's magic number and
file name. If file is not on system or is of an type not recognized by Spack as
an archive or compression type, None is returned.
"""
if os.path.exists(file):
with open(file, "rb") as f:
ext = extension_from_stream(f, decompress)
# based on magic number, file is compressed
# tar archive. Check to see if file is abbreviated as
# t[xz|gz|bz2|bz]
if ext and ext.startswith("tar."):
suf = ext.split(".")[1]
abbr = "t" + suf
if check_extension(file, abbr):
return abbr
if not ext:
# If unable to parse extension from stream,
# attempt to fall back to string parsing
ext = extension_from_path(file)
return ext
return None
def extension_from_path(path):
"""Get the allowed archive extension for a path.
If path does not include a valid archive extension
(see`spack.util.compression.ALLOWED_ARCHIVE_TYPES`) return None
"""
if path is None:
raise ValueError("Can't call extension() on None")
# Strip sourceforge suffix.
if re.search(r"((?:sourceforge.net|sf.net)/.*)/download$", path):
path = os.path.dirname(path)
for t in ALLOWED_ARCHIVE_TYPES:
suffix = r"\.%s$" % t
if re.search(suffix, path):
if check_extension(path, t):
return t
return None
def strip_last_extension(path):
"""Strips last supported archive extension from path"""
if path:
for ext in ALLOWED_SINGLE_EXT_ARCHIVE_TYPES:
mod_path = check_and_remove_ext(path, ext)
if mod_path != path:
return mod_path
return path
def strip_extension(path, ext=None):
"""Get the part of a path that does not include its compressed
type extension."""
if ext:
return check_and_remove_ext(path, ext)
for t in ALLOWED_ARCHIVE_TYPES:
mod_path = check_and_remove_ext(path, t)
if mod_path != path:
return mod_path
return path
def check_extension(path, ext):
"""Check if extension is present in path"""
# Strip sourceforge suffix.
prefix, _ = spath.find_sourceforge_suffix(path)
if not ext.startswith(r"\."):
ext = r"\.%s$" % ext
if re.search(ext, prefix):
return True
return False
def reg_remove_ext(path, ext):
"""Regex remove ext from path"""
if path and ext:
suffix = r"\.%s$" % ext
return re.sub(suffix, "", path)
return path
def check_and_remove_ext(path, ext):
"""If given extension is present in path, remove and return,
otherwise just return path"""
if check_extension(path, ext):
return reg_remove_ext(path, ext)
return path

View File

@ -71,6 +71,15 @@ def win_exe_ext():
return ".exe"
def find_sourceforge_suffix(path):
"""find and match sourceforge filepath components
Return match object"""
match = re.search(r"(.*(?:sourceforge\.net|sf\.net)/.*)(/download)$", path)
if match:
return match.groups()
return path, ""
def path_to_os_path(*pths):
"""
Takes an arbitrary number of positional parameters