compression.py: refactor + bug fix (#42367)

Improve naming, so it's clear file "extensions" are not taken in the
`PurePath(path).suffix` sense as the original function name suggests,
but rather that the files are opened and their magic bytes are
classified.

Add type hints.

Fix a bug where `stream.read(num_bytes)` was run on the compressed
stream instead of the uncompressed stream, which can potentially break
detection of tar.bz2 files.

Ensure that when peeking into streams for magic bytes, they are reset to
their original position upon return.

Use new API in `spack logs`.
This commit is contained in:
Harmen Stoppels 2024-01-31 07:59:07 +01:00 committed by GitHub
parent 376653ec3d
commit 517dac6ff8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 287 additions and 383 deletions

View File

@ -199,6 +199,7 @@ def setup(sphinx):
("py:class", "contextlib.contextmanager"),
("py:class", "module"),
("py:class", "_io.BufferedReader"),
("py:class", "_io.BytesIO"),
("py:class", "unittest.case.TestCase"),
("py:class", "_frozen_importlib_external.SourceFileLoader"),
("py:class", "clingo.Control"),

View File

@ -5,11 +5,13 @@
import errno
import gzip
import io
import os
import shutil
import sys
import spack.cmd
import spack.spec
import spack.util.compression as compression
from spack.cmd.common import arguments
from spack.main import SpackCommandError
@ -23,45 +25,36 @@ def setup_parser(subparser):
arguments.add_common_arguments(subparser, ["spec"])
def _dump_byte_stream_to_stdout(instream):
def _dump_byte_stream_to_stdout(instream: io.BufferedIOBase) -> None:
# Reopen stdout in binary mode so we don't have to worry about encoding
outstream = os.fdopen(sys.stdout.fileno(), "wb", closefd=False)
shutil.copyfileobj(instream, outstream)
def dump_build_log(package):
with open(package.log_path, "rb") as f:
_dump_byte_stream_to_stdout(f)
def _logs(cmdline_spec, concrete_spec):
def _logs(cmdline_spec: spack.spec.Spec, concrete_spec: spack.spec.Spec):
if concrete_spec.installed:
log_path = concrete_spec.package.install_log_path
elif os.path.exists(concrete_spec.package.stage.path):
dump_build_log(concrete_spec.package)
return
# TODO: `spack logs` can currently not show the logs while a package is being built, as the
# combined log file is only written after the build is finished.
log_path = concrete_spec.package.log_path
else:
raise SpackCommandError(f"{cmdline_spec} is not installed or staged")
try:
compression_ext = compression.extension_from_file(log_path)
with open(log_path, "rb") as fstream:
if compression_ext == "gz":
# If the log file is compressed, wrap it with a decompressor
fstream = gzip.open(log_path, "rb")
elif compression_ext:
raise SpackCommandError(
f"Unsupported storage format for {log_path}: {compression_ext}"
)
_dump_byte_stream_to_stdout(fstream)
stream = open(log_path, "rb")
except OSError as e:
if e.errno == errno.ENOENT:
raise SpackCommandError(f"No logs are available for {cmdline_spec}") from e
elif e.errno == errno.EPERM:
raise SpackCommandError(f"Permission error accessing {log_path}") from e
else:
raise
raise SpackCommandError(f"Error reading logs for {cmdline_spec}: {e}") from e
with stream as f:
ext = compression.extension_from_magic_numbers_by_stream(f, decompress=False)
if ext and ext != "gz":
raise SpackCommandError(f"Unsupported storage format for {log_path}: {ext}")
# If the log file is gzip compressed, wrap it with a decompressor
_dump_byte_stream_to_stdout(gzip.GzipFile(fileobj=f) if ext == "gz" else f)
def logs(parser, args):

View File

@ -4,8 +4,10 @@
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import io
import os
import shutil
import tarfile
from itertools import product
import pytest
@ -14,7 +16,7 @@
from llnl.util.filesystem import working_dir
from spack.paths import spack_root
from spack.util import compression as scomp
from spack.util import compression
from spack.util.executable import CommandNotFoundError
datadir = os.path.join(spack_root, "lib", "spack", "spack", "test", "data", "compression")
@ -30,15 +32,11 @@
native_archive_list = [key for key in ext_archive.keys() if "tar" not in key and "zip" not in key]
def support_stub():
return False
@pytest.fixture
def compr_support_check(monkeypatch):
monkeypatch.setattr(scomp, "is_lzma_supported", support_stub)
monkeypatch.setattr(scomp, "is_gzip_supported", support_stub)
monkeypatch.setattr(scomp, "is_bz2_supported", support_stub)
monkeypatch.setattr(compression, "LZMA_SUPPORTED", False)
monkeypatch.setattr(compression, "GZIP_SUPPORTED", False)
monkeypatch.setattr(compression, "BZ2_SUPPORTED", False)
@pytest.fixture
@ -59,7 +57,7 @@ def archive_file_and_extension(tmpdir_factory, request):
)
def test_native_unpacking(tmpdir_factory, archive_file_and_extension):
archive_file, extension = archive_file_and_extension
util = scomp.decompressor_for(archive_file, extension)
util = compression.decompressor_for(archive_file, extension)
tmpdir = tmpdir_factory.mktemp("comp_test")
with working_dir(str(tmpdir)):
assert not os.listdir(os.getcwd())
@ -78,7 +76,7 @@ def test_native_unpacking(tmpdir_factory, archive_file_and_extension):
def test_system_unpacking(tmpdir_factory, archive_file_and_extension, compr_support_check):
# actually run test
archive_file, _ = archive_file_and_extension
util = scomp.decompressor_for(archive_file)
util = compression.decompressor_for(archive_file)
tmpdir = tmpdir_factory.mktemp("system_comp_test")
with working_dir(str(tmpdir)):
assert not os.listdir(os.getcwd())
@ -95,4 +93,25 @@ def test_unallowed_extension():
# are picked up by the linter and break style checks
bad_ext_archive = "Foo.cxx"
with pytest.raises(CommandNotFoundError):
scomp.decompressor_for(bad_ext_archive)
compression.decompressor_for(bad_ext_archive)
@pytest.mark.parametrize("ext", ["gz", "bz2", "xz"])
def test_file_type_check_does_not_advance_stream(tmp_path, ext):
# Create a tarball compressed with the given format
path = str(tmp_path / "compressed_tarball")
try:
with tarfile.open(path, f"w:{ext}") as tar:
tar.addfile(tarfile.TarInfo("test.txt"), fileobj=io.BytesIO(b"test"))
except tarfile.CompressionError:
pytest.skip(f"Cannot create tar.{ext} files")
# Classify the file from its magic bytes, and check that the stream is not advanced
with open(path, "rb") as f:
computed_ext = compression.extension_from_magic_numbers_by_stream(f, decompress=False)
assert computed_ext == ext
assert f.tell() == 0
computed_ext = compression.extension_from_magic_numbers_by_stream(f, decompress=True)
assert computed_ext == f"tar.{ext}"
assert f.tell() == 0

View File

@ -3,12 +3,13 @@
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import errno
import inspect
import io
import os
import re
import shutil
import sys
from typing import BinaryIO, Callable, Dict, List, Optional
import llnl.url
from llnl.util import tty
@ -19,42 +20,29 @@
try:
import bz2 # noqa
_bz2_support = True
BZ2_SUPPORTED = True
except ImportError:
_bz2_support = False
BZ2_SUPPORTED = False
try:
import gzip # noqa
_gzip_support = True
GZIP_SUPPORTED = True
except ImportError:
_gzip_support = False
GZIP_SUPPORTED = False
try:
import lzma # noqa # novermin
_lzma_support = True
LZMA_SUPPORTED = True
except ImportError:
_lzma_support = False
LZMA_SUPPORTED = False
def is_lzma_supported():
return _lzma_support
def is_gzip_supported():
return _gzip_support
def is_bz2_supported():
return _bz2_support
def _system_untar(archive_file, remove_archive_file=False):
"""Returns path to unarchived tar file.
Untars archive via system tar.
def _system_untar(archive_file: str, remove_archive_file: bool = False) -> str:
"""Returns path to unarchived tar file. Untars archive via system tar.
Args:
archive_file (str): absolute path to the archive to be extracted.
@ -69,6 +57,11 @@ def _system_untar(archive_file, remove_archive_file=False):
archive_file = archive_file_no_ext + "-input"
shutil.move(archive_file_no_ext, archive_file)
tar = which("tar", required=True)
# GNU tar's --no-same-owner is not as portable, -o works for BSD tar too. This flag is relevant
# when extracting archives as root, where tar attempts to set original ownership of files. This
# is redundant when distributing tarballs, as the tarballs are created on different systems
# than where they are extracted. In certain cases like rootless containers, setting original
# ownership is known to fail, so we need to disable it.
tar.add_default_arg("-oxf")
tar(archive_file)
if remove_archive_file:
@ -79,21 +72,21 @@ def _system_untar(archive_file, remove_archive_file=False):
return outfile
def _bunzip2(archive_file):
def _bunzip2(archive_file: str) -> str:
"""Returns path to decompressed file.
Uses Python's bz2 module to decompress bz2 compressed archives
Fall back to system utility failing to find Python module `bz2`
Args:
archive_file (str): absolute path to the bz2 archive to be decompressed
archive_file: absolute path to the bz2 archive to be decompressed
"""
if is_bz2_supported():
if BZ2_SUPPORTED:
return _py_bunzip(archive_file)
else:
return _system_bunzip(archive_file)
def _py_bunzip(archive_file):
def _py_bunzip(archive_file: str) -> str:
"""Returns path to decompressed file.
Decompresses bz2 compressed archives/files via python's bz2 module"""
decompressed_file = os.path.basename(llnl.url.strip_compression_extension(archive_file, "bz2"))
@ -106,7 +99,7 @@ def _py_bunzip(archive_file):
return archive_out
def _system_bunzip(archive_file):
def _system_bunzip(archive_file: str) -> str:
"""Returns path to decompressed file.
Decompresses bz2 compressed archives/files via system bzip2 utility"""
compressed_file_name = os.path.basename(archive_file)
@ -121,25 +114,20 @@ def _system_bunzip(archive_file):
return archive_out
def _gunzip(archive_file):
"""Returns path to gunzip'd file
Decompresses `.gz` extensions. Prefer native Python `gzip` module.
Failing back to system utility gunzip.
Like gunzip, but extracts in the current working directory
instead of in-place.
def _gunzip(archive_file: str) -> str:
"""Returns path to gunzip'd file. Decompresses `.gz` extensions. Prefer native Python
`gzip` module. Falling back to system utility gunzip. Like gunzip, but extracts in the current
working directory instead of in-place.
Args:
archive_file (str): absolute path of the file to be decompressed
archive_file: absolute path of the file to be decompressed
"""
if is_gzip_supported():
return _py_gunzip(archive_file)
else:
return _system_gunzip(archive_file)
return _py_gunzip(archive_file) if GZIP_SUPPORTED else _system_gunzip(archive_file)
def _py_gunzip(archive_file):
"""Returns path to gunzip'd file
Decompresses `.gz` compressed archvies via python gzip module"""
def _py_gunzip(archive_file: str) -> str:
"""Returns path to gunzip'd file. Decompresses `.gz` compressed archvies via python gzip
module"""
decompressed_file = os.path.basename(llnl.url.strip_compression_extension(archive_file, "gz"))
working_dir = os.getcwd()
destination_abspath = os.path.join(working_dir, decompressed_file)
@ -150,9 +138,8 @@ def _py_gunzip(archive_file):
return destination_abspath
def _system_gunzip(archive_file):
"""Returns path to gunzip'd file
Decompresses `.gz` compressed files via system gzip"""
def _system_gunzip(archive_file: str) -> str:
"""Returns path to gunzip'd file. Decompresses `.gz` compressed files via system gzip"""
archive_file_no_ext = llnl.url.strip_compression_extension(archive_file)
if archive_file_no_ext == archive_file:
# the zip file has no extension. On Unix gunzip cannot unzip onto itself
@ -170,50 +157,38 @@ def _system_gunzip(archive_file):
return destination_abspath
def _unzip(archive_file):
"""Returns path to extracted zip archive
Extract Zipfile, searching for unzip system executable
If unavailable, search for 'tar' executable on system and use instead
def _unzip(archive_file: str) -> str:
"""Returns path to extracted zip archive. Extract Zipfile, searching for unzip system
executable. If unavailable, search for 'tar' executable on system and use instead.
Args:
archive_file (str): absolute path of the file to be decompressed
archive_file: absolute path of the file to be decompressed
"""
extracted_file = os.path.basename(llnl.url.strip_extension(archive_file, extension="zip"))
if sys.platform == "win32":
return _system_untar(archive_file)
else:
exe = "unzip"
arg = "-q"
unzip = which(exe, required=True)
unzip.add_default_arg(arg)
unzip(archive_file)
return extracted_file
unzip = which("unzip", required=True)
unzip.add_default_arg("-q")
unzip(archive_file)
return os.path.basename(llnl.url.strip_extension(archive_file, extension="zip"))
def _system_unZ(archive_file):
def _system_unZ(archive_file: str) -> str:
"""Returns path to decompressed file
Decompress UNIX compress style compression
Utilizes gunzip on unix and 7zip on Windows
"""
if sys.platform == "win32":
result = _system_7zip(archive_file)
else:
result = _system_gunzip(archive_file)
return result
return _system_7zip(archive_file)
return _system_gunzip(archive_file)
def _lzma_decomp(archive_file):
"""Returns path to decompressed xz file.
Decompress lzma compressed files. Prefer Python native
lzma module, but fall back on command line xz tooling
to find available Python support."""
if is_lzma_supported():
return _py_lzma(archive_file)
else:
return _xz(archive_file)
"""Returns path to decompressed xz file. Decompress lzma compressed files. Prefer Python native
lzma module, but fall back on command line xz tooling to find available Python support."""
return _py_lzma(archive_file) if LZMA_SUPPORTED else _xz(archive_file)
def _win_compressed_tarball_handler(decompressor):
def _win_compressed_tarball_handler(decompressor: Callable[[str], str]) -> Callable[[str], str]:
"""Returns function pointer to two stage decompression
and extraction method
Decompress and extract compressed tarballs on Windows.
@ -227,7 +202,7 @@ def _win_compressed_tarball_handler(decompressor):
can be installed manually or via spack
"""
def unarchive(archive_file):
def unarchive(archive_file: str):
# perform intermediate extraction step
# record name of new archive so we can extract
decomped_tarball = decompressor(archive_file)
@ -238,9 +213,9 @@ def unarchive(archive_file):
return unarchive
def _py_lzma(archive_file):
"""Returns path to decompressed .xz files
Decompress lzma compressed .xz files via python lzma module"""
def _py_lzma(archive_file: str) -> str:
"""Returns path to decompressed .xz files. Decompress lzma compressed .xz files via Python
lzma module."""
decompressed_file = os.path.basename(llnl.url.strip_compression_extension(archive_file, "xz"))
archive_out = os.path.join(os.getcwd(), decompressed_file)
with open(archive_out, "wb") as ar:
@ -250,10 +225,8 @@ def _py_lzma(archive_file):
def _xz(archive_file):
"""Returns path to decompressed xz files
Decompress lzma compressed .xz files via xz command line
tool.
"""
"""Returns path to decompressed xz files. Decompress lzma compressed .xz files via xz command
line tool."""
decompressed_file = os.path.basename(llnl.url.strip_extension(archive_file, extension="xz"))
working_dir = os.getcwd()
destination_abspath = os.path.join(working_dir, decompressed_file)
@ -292,19 +265,17 @@ def _system_7zip(archive_file):
return outfile
def decompressor_for(path, extension=None):
def decompressor_for(path: str, extension: Optional[str] = None):
"""Returns appropriate decompression/extraction algorithm function pointer
for provided extension. If extension is none, it is computed
from the `path` and the decompression function is derived
from that information."""
if not extension:
extension = extension_from_file(path, decompress=True)
extension = extension_from_magic_numbers(path, decompress=True)
if not llnl.url.allowed_archive(extension):
if not extension or not llnl.url.allowed_archive(extension):
raise CommandNotFoundError(
"Cannot extract archive, \
unrecognized file extension: '%s'"
% extension
f"Cannot extract {path}, unrecognized file extension: '{extension}'"
)
if sys.platform == "win32":
return decompressor_for_win(extension)
@ -312,58 +283,37 @@ def decompressor_for(path, extension=None):
return decompressor_for_nix(extension)
def decompressor_for_nix(extension):
"""Returns a function pointer to appropriate decompression
algorithm based on extension type and unix specific considerations
i.e. a reasonable expectation system utils like gzip, bzip2, and xz are
available
def decompressor_for_nix(extension: str) -> Callable[[str], str]:
"""Returns a function pointer to appropriate decompression algorithm based on extension type
and unix specific considerations i.e. a reasonable expectation system utils like gzip, bzip2,
and xz are available
Args:
path (str): path of the archive file requiring decompression
extension: path of the archive file requiring decompression
"""
if re.match(r"zip$", extension):
return _unzip
extension_to_decompressor: Dict[str, Callable[[str], str]] = {
"zip": _unzip,
"gz": _gunzip,
"bz2": _bunzip2,
"Z": _system_unZ, # no builtin support for .Z files
"xz": _lzma_decomp,
}
if re.match(r"gz$", extension):
return _gunzip
if re.match(r"bz2$", extension):
return _bunzip2
# Python does not have native support
# of any kind for .Z files. In these cases,
# we rely on external tools such as tar,
# 7z, or uncompressZ
if re.match(r"Z$", extension):
return _system_unZ
# Python and platform may not have support for lzma
# compression. If no lzma support, use tools available on systems
if re.match(r"xz$", extension):
return _lzma_decomp
return _system_untar
return extension_to_decompressor.get(extension, _system_untar)
def _determine_py_decomp_archive_strategy(extension):
def _determine_py_decomp_archive_strategy(extension: str) -> Optional[Callable[[str], str]]:
"""Returns appropriate python based decompression strategy
based on extension type"""
# Only rely on Python decompression support for gz
if re.match(r"gz$", extension):
return _py_gunzip
# Only rely on Python decompression support for bzip2
if re.match(r"bz2$", extension):
return _py_bunzip
# Only rely on Python decompression support for xz
if re.match(r"xz$", extension):
return _py_lzma
return None
extension_to_decompressor: Dict[str, Callable[[str], str]] = {
"gz": _py_gunzip,
"bz2": _py_bunzip,
"xz": _py_lzma,
}
return extension_to_decompressor.get(extension, None)
def decompressor_for_win(extension):
def decompressor_for_win(extension: str) -> Callable[[str], str]:
"""Returns a function pointer to appropriate decompression
algorithm based on extension type and Windows specific considerations
@ -371,34 +321,32 @@ def decompressor_for_win(extension):
So we must rely exclusively on Python module support for all compression
operations, tar for tarballs and zip files, and 7zip for Z compressed archives
and files as Python does not provide support for the UNIX compress algorithm
Args:
path (str): path of the archive file requiring decompression
extension (str): extension
"""
extension = llnl.url.expand_contracted_extension(extension)
# Windows native tar can handle .zip extensions, use standard
# unzip method
if re.match(r"zip$", extension):
return _unzip
extension_to_decompressor: Dict[str, Callable[[str], str]] = {
# Windows native tar can handle .zip extensions, use standard unzip method
"zip": _unzip,
# if extension is standard tarball, invoke Windows native tar
"tar": _system_untar,
# Python does not have native support of any kind for .Z files. In these cases, we rely on
# 7zip, which must be installed outside of Spack and added to the PATH or externally
# detected
"Z": _system_unZ,
"xz": _lzma_decomp,
}
# if extension is standard tarball, invoke Windows native tar
if re.match(r"tar$", extension):
return _system_untar
decompressor = extension_to_decompressor.get(extension)
if decompressor:
return decompressor
# Python does not have native support
# of any kind for .Z files. In these cases,
# we rely on 7zip, which must be installed outside
# of spack and added to the PATH or externally detected
if re.match(r"Z$", extension):
return _system_unZ
# Windows vendors no native decompression tools, attempt to derive
# python based decompression strategy
# Expand extension from contracted extension i.e. tar.gz from .tgz
# no-op on non contracted extensions
# Windows vendors no native decompression tools, attempt to derive Python based decompression
# strategy. Expand extension from abbreviated ones, i.e. tar.gz from .tgz
compression_extension = llnl.url.compression_ext_from_compressed_archive(extension)
decompressor = _determine_py_decomp_archive_strategy(compression_extension)
decompressor = (
_determine_py_decomp_archive_strategy(compression_extension)
if compression_extension
else None
)
if not decompressor:
raise SpackError(
"Spack was unable to determine a proper decompression strategy for"
@ -412,103 +360,75 @@ def decompressor_for_win(extension):
class FileTypeInterface:
"""
Base interface class for describing and querying file type information.
FileType describes information about a single file type
such as extension, and byte header properties, and provides an interface
to check a given file against said type based on magic number.
"""Base interface class for describing and querying file type information. FileType describes
information about a single file type such as typical extension and byte header properties,
and provides an interface to check a given file against said type based on magic number.
This class should be subclassed each time a new type is to be
described.
This class should be subclassed each time a new type is to be described.
Note: This class should not be used directly as it does not define any specific
file. Attempts to directly use this class will fail, as it does not define
a magic number or extension string.
Subclasses should each describe a different
type of file. In order to do so, they must define
the extension string, magic number, and header offset (if non zero).
If a class has multiple magic numbers, it will need to
override the method describin that file types magic numbers and
the method that checks a types magic numbers against a given file's.
"""
Subclasses should each describe a different type of file. In order to do so, they must define
the extension string, magic number, and header offset (if non zero). If a class has multiple
magic numbers, it will need to override the method describing that file type's magic numbers
and the method that checks a types magic numbers against a given file's."""
OFFSET = 0
compressed = False
@staticmethod
def name():
raise NotImplementedError
extension: str
name: str
@classmethod
def magic_number(cls):
def magic_numbers(cls) -> List[bytes]:
"""Return a list of all potential magic numbers for a filetype"""
return [x[1] for x in inspect.getmembers(cls) if x[0].startswith("_MAGIC_NUMBER")]
return [
value for name, value in inspect.getmembers(cls) if name.startswith("_MAGIC_NUMBER")
]
@classmethod
def header_size(cls):
def header_size(cls) -> int:
"""Return size of largest magic number associated with file type"""
return max([len(x) for x in cls.magic_number()])
return max(len(x) for x in cls.magic_numbers())
@classmethod
def _bytes_check(cls, magic_bytes):
for magic in cls.magic_number():
if magic_bytes.startswith(magic):
return True
return False
@classmethod
def is_file_of_type(cls, iostream):
"""Query byte stream for appropriate magic number
def matches_magic(self, stream: BinaryIO) -> bool:
"""Returns true if the stream matches the current file type by any of its magic numbers.
Resets stream to original position.
Args:
iostream: file byte stream
Returns:
Bool denoting whether file is of class file type
based on magic number
stream: file byte stream
"""
if not iostream:
return False
# move to location of magic bytes
iostream.seek(cls.OFFSET)
magic_bytes = iostream.read(cls.header_size())
# return to beginning of file
iostream.seek(0)
if cls._bytes_check(magic_bytes):
return True
return False
offset = stream.tell()
stream.seek(self.OFFSET)
magic_bytes = stream.read(self.header_size())
stream.seek(offset)
return any(magic_bytes.startswith(magic) for magic in self.magic_numbers())
class CompressedFileTypeInterface(FileTypeInterface):
"""Interface class for FileTypes that include compression information"""
compressed = True
def peek(self, stream: BinaryIO, num_bytes: int) -> Optional[io.BytesIO]:
"""This method returns the first num_bytes of a decompressed stream. Returns None if no
builtin support for decompression."""
return None
@staticmethod
def decomp_in_memory(stream):
"""This method decompresses and loads the first 200 or so bytes of a compressed file
to check for compressed archives. This does not decompress the entire file and should
not be used for direct expansion of archives/compressed files
"""
raise NotImplementedError("Implementation by compression subclass required")
def _decompressed_peek(
decompressed_stream: io.BufferedIOBase, stream: BinaryIO, num_bytes: int
) -> io.BytesIO:
# Read the first num_bytes of the decompressed stream, do not advance the stream position.
pos = stream.tell()
data = decompressed_stream.read(num_bytes)
stream.seek(pos)
return io.BytesIO(data)
class BZipFileType(CompressedFileTypeInterface):
_MAGIC_NUMBER = b"\x42\x5a\x68"
extension = "bz2"
name = "bzip2 compressed data"
@staticmethod
def name():
return "bzip2 compressed data"
@staticmethod
def decomp_in_memory(stream):
if is_bz2_supported():
# checking for underlying archive, only decomp as many bytes
# as is absolutely neccesary for largest archive header (tar)
comp_stream = stream.read(TarFileType.OFFSET + TarFileType.header_size())
return io.BytesIO(initial_bytes=bz2.BZ2Decompressor().decompress(comp_stream))
def peek(self, stream: BinaryIO, num_bytes: int) -> Optional[io.BytesIO]:
if BZ2_SUPPORTED:
return _decompressed_peek(bz2.BZ2File(stream), stream, num_bytes)
return None
@ -516,57 +436,28 @@ class ZCompressedFileType(CompressedFileTypeInterface):
_MAGIC_NUMBER_LZW = b"\x1f\x9d"
_MAGIC_NUMBER_LZH = b"\x1f\xa0"
extension = "Z"
@staticmethod
def name():
return "compress'd data"
@staticmethod
def decomp_in_memory(stream):
# python has no method of decompressing `.Z` files in memory
return None
name = "compress'd data"
class GZipFileType(CompressedFileTypeInterface):
_MAGIC_NUMBER = b"\x1f\x8b\x08"
extension = "gz"
name = "gzip compressed data"
@staticmethod
def name():
return "gzip compressed data"
@staticmethod
def decomp_in_memory(stream):
if is_gzip_supported():
# checking for underlying archive, only decomp as many bytes
# as is absolutely neccesary for largest archive header (tar)
return io.BytesIO(
initial_bytes=gzip.GzipFile(fileobj=stream).read(
TarFileType.OFFSET + TarFileType.header_size()
)
)
def peek(self, stream: BinaryIO, num_bytes: int) -> Optional[io.BytesIO]:
if GZIP_SUPPORTED:
return _decompressed_peek(gzip.GzipFile(fileobj=stream), stream, num_bytes)
return None
class LzmaFileType(CompressedFileTypeInterface):
_MAGIC_NUMBER = b"\xfd7zXZ"
extension = "xz"
name = "xz compressed data"
@staticmethod
def name():
return "xz compressed data"
@staticmethod
def decomp_in_memory(stream):
if is_lzma_supported():
# checking for underlying archive, only decomp as many bytes
# as is absolutely neccesary for largest archive header (tar)
max_size = TarFileType.OFFSET + TarFileType.header_size()
return io.BytesIO(
initial_bytes=lzma.LZMADecompressor().decompress(
stream.read(max_size), max_length=max_size
)
)
def peek(self, stream: BinaryIO, num_bytes: int) -> Optional[io.BytesIO]:
if LZMA_SUPPORTED:
return _decompressed_peek(lzma.LZMAFile(stream), stream, num_bytes)
return None
@ -575,111 +466,111 @@ class TarFileType(FileTypeInterface):
_MAGIC_NUMBER_GNU = b"ustar \0"
_MAGIC_NUMBER_POSIX = b"ustar\x0000"
extension = "tar"
@staticmethod
def name():
return "tar archive"
name = "tar archive"
class ZipFleType(FileTypeInterface):
_MAGIC_NUMBER = b"PK\003\004"
extension = "zip"
@staticmethod
def name():
return "Zip archive data"
name = "Zip archive data"
# collection of valid Spack recognized archive and compression
# file type identifier classes.
VALID_FILETYPES = [
BZipFileType,
ZCompressedFileType,
GZipFileType,
LzmaFileType,
TarFileType,
ZipFleType,
#: Maximum number of bytes to read from a file to determine any archive type. Tar is the largest.
MAX_BYTES_ARCHIVE_HEADER = TarFileType.OFFSET + TarFileType.header_size()
#: Collection of supported archive and compression file type identifier classes.
SUPPORTED_FILETYPES: List[FileTypeInterface] = [
BZipFileType(),
ZCompressedFileType(),
GZipFileType(),
LzmaFileType(),
TarFileType(),
ZipFleType(),
]
def extension_from_stream(stream, decompress=False):
"""Return extension represented by stream corresponding to archive file
If stream does not represent an archive type recongized by Spack
(see `spack.util.compression.ALLOWED_ARCHIVE_TYPES`) method will return None
def _extension_of_compressed_file(
file_type: CompressedFileTypeInterface, stream: BinaryIO
) -> Optional[str]:
"""Retrieves the extension of a file after decompression from its magic numbers, if it can be
decompressed."""
# To classify the file we only need to decompress the first so many bytes.
decompressed_magic = file_type.peek(stream, MAX_BYTES_ARCHIVE_HEADER)
Extension type is derived by searching for identifying bytes
in file stream.
if not decompressed_magic:
return None
return extension_from_magic_numbers_by_stream(decompressed_magic, decompress=False)
def extension_from_magic_numbers_by_stream(
stream: BinaryIO, decompress: bool = False
) -> Optional[str]:
"""Returns the typical extension for the opened file, without leading ``.``, based on its magic
numbers.
If the stream does not represent file type recongized by Spack (see
:py:data:`SUPPORTED_FILETYPES`), the method will return None
Args:
stream : stream representing a file on system
decompress (bool) : if True, compressed files are checked
for archive types beneath compression i.e. tar.gz
default is False, otherwise, return top level type i.e. gz
Return:
A string represting corresponding archive extension
or None as relevant.
"""
for arc_type in VALID_FILETYPES:
if arc_type.is_file_of_type(stream):
suffix_ext = arc_type.extension
prefix_ext = ""
if arc_type.compressed and decompress:
# stream represents compressed file
# get decompressed stream (if possible)
decomp_stream = arc_type.decomp_in_memory(stream)
prefix_ext = extension_from_stream(decomp_stream, decompress=decompress)
if not prefix_ext:
# We were unable to decompress or unable to derive
# a nested extension from decompressed file.
# Try to use filename parsing to check for
# potential nested extensions if there are any
tty.debug(
"Cannot derive file extension from magic number;"
" falling back to regex path parsing."
)
return llnl.url.extension_from_path(stream.name)
resultant_ext = suffix_ext if not prefix_ext else ".".join([prefix_ext, suffix_ext])
tty.debug("File extension %s successfully derived by magic number." % resultant_ext)
return resultant_ext
stream: stream representing a file on system
decompress: if True, compressed files are checked for archive types beneath compression.
For example tar.gz if True versus only gz if False."""
for file_type in SUPPORTED_FILETYPES:
if not file_type.matches_magic(stream):
continue
ext = file_type.extension
if decompress and isinstance(file_type, CompressedFileTypeInterface):
uncompressed_ext = _extension_of_compressed_file(file_type, stream)
if not uncompressed_ext:
tty.debug(
"Cannot derive file extension from magic number;"
" falling back to original file name."
)
return llnl.url.extension_from_path(stream.name)
ext = f"{uncompressed_ext}.{ext}"
tty.debug(f"File extension {ext} successfully derived by magic number.")
return ext
return None
def extension_from_file(file, decompress=False):
"""Return extension from archive file path
Extension is derived based on magic number parsing similar
to the `file` utility. Attempts to return abbreviated file extensions
whenever a file has an abbreviated extension such as `.tgz` or `.txz`.
This distinction in abbreivated extension names is accomplished
by string parsing.
def _maybe_abbreviate_extension(path: str, extension: str) -> str:
"""If the file is a compressed tar archive, return the abbreviated extension t[xz|gz|bz2|bz]
instead of tar.[xz|gz|bz2|bz] if the file's original name also has an abbreviated extension."""
if not extension.startswith("tar."):
return extension
abbr = f"t{extension[4:]}"
return abbr if llnl.url.has_extension(path, abbr) else extension
def extension_from_magic_numbers(path: str, decompress: bool = False) -> Optional[str]:
"""Return typical extension without leading ``.`` of a compressed file or archive at the given
path, based on its magic numbers, similar to the `file` utility. Notice that the extension
returned from this function may not coincide with the file's given extension.
Args:
file (os.PathLike): path descibing file on system for which ext
will be determined.
decompress (bool): If True, method will peek into compressed
files to check for archive file types. default is False.
If false, method will be unable to distinguish `.tar.gz` from `.gz`
or similar.
Return:
Spack recognized archive file extension as determined by file's magic number and
file name. If file is not on system or is of an type not recognized by Spack as
an archive or compression type, None is returned.
path: file to determine extension of
decompress: If True, method will peek into decompressed file to check for archive file
types. If False, the method will return only the top-level extension (for example
``gz`` and not ``tar.gz``).
Returns:
Spack recognized archive file extension as determined by file's magic number and file name.
If file is not on system or is of a type not recognized by Spack as an archive or
compression type, None is returned. If the file is classified as a compressed tarball, the
extension is abbreviated (for instance ``tgz`` not ``tar.gz``) if that matches the file's
given extension.
"""
if os.path.exists(file):
with open(file, "rb") as f:
ext = extension_from_stream(f, decompress)
# based on magic number, file is compressed
# tar archive. Check to see if file is abbreviated as
# t[xz|gz|bz2|bz]
if ext and ext.startswith("tar."):
suf = ext.split(".")[1]
abbr = "t" + suf
if llnl.url.has_extension(file, abbr):
return abbr
if not ext:
# If unable to parse extension from stream,
# attempt to fall back to string parsing
ext = llnl.url.extension_from_path(file)
return ext
return None
try:
with open(path, "rb") as f:
ext = extension_from_magic_numbers_by_stream(f, decompress)
except OSError as e:
if e.errno == errno.ENOENT:
return None
raise
# Return the extension derived from the magic number if possible.
if ext:
return _maybe_abbreviate_extension(path, ext)
# Otherwise, use the extension from the file name.
return llnl.url.extension_from_path(path)