Decompression: fix naming issues (#37749)

* When using system tools to unpack a .gz file, the input file needs a
  different name than the output file. Normally, we generate this new
  name by stripping off the .gz extension off of the file name.
  This was not sufficient if the file name did not have an extension,
  so we temporarily rename the file in that case.
* When using system tar utility to untar on Windows, we were (erroneously)
  skipping the actual untar step if the filename was lacking a .tar
  extension
* For foo.txz, we were not changing the extension of the decompressed file
  (i.e. we would decompress foo.txz to foo.txz). This did not cause any
  problems, but is confusing, so has been updated such that the output
  filename reflects its decompressed state (i.e. foo.tar).
* Added test for strip_compression_extension
* Update test_native_unpacking to test each archive type with and without
  an extension as part of the file name (i.e. we test "foo.tar.gz", but
  also make sure we decompress properly if it is named "foo").
This commit is contained in:
Dan Lipsa 2023-07-17 17:33:18 -04:00 committed by GitHub
parent f05837a480
commit 4831d45852
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 74 additions and 33 deletions

View File

@ -3,9 +3,11 @@
# #
# SPDX-License-Identifier: (Apache-2.0 OR MIT) # SPDX-License-Identifier: (Apache-2.0 OR MIT)
import os import os
import shutil import shutil
import sys import sys
from itertools import product
import pytest import pytest
@ -40,18 +42,24 @@ def compr_support_check(monkeypatch):
@pytest.fixture @pytest.fixture
def archive_file(tmpdir_factory, request): def archive_file_and_extension(tmpdir_factory, request):
"""Copy example archive to temp directory for test""" """Copy example archive to temp directory into an extension-less file for test"""
archive_file_stub = os.path.join(datadir, "Foo") archive_file_stub = os.path.join(datadir, "Foo")
extension = request.param extension, add_extension = request.param
tmpdir = tmpdir_factory.mktemp("compression") tmpdir = tmpdir_factory.mktemp("compression")
shutil.copy(archive_file_stub + "." + extension, str(tmpdir)) tmp_archive_file = os.path.join(
return os.path.join(str(tmpdir), "Foo.%s" % extension) str(tmpdir), "Foo" + (("." + extension) if add_extension else "")
)
shutil.copy(archive_file_stub + "." + extension, tmp_archive_file)
return (tmp_archive_file, extension)
@pytest.mark.parametrize("archive_file", native_archive_list, indirect=True) @pytest.mark.parametrize(
def test_native_unpacking(tmpdir_factory, archive_file): "archive_file_and_extension", product(native_archive_list, [True, False]), indirect=True
util = scomp.decompressor_for(archive_file) )
def test_native_unpacking(tmpdir_factory, archive_file_and_extension):
archive_file, extension = archive_file_and_extension
util = scomp.decompressor_for(archive_file, extension)
tmpdir = tmpdir_factory.mktemp("comp_test") tmpdir = tmpdir_factory.mktemp("comp_test")
with working_dir(str(tmpdir)): with working_dir(str(tmpdir)):
assert not os.listdir(os.getcwd()) assert not os.listdir(os.getcwd())
@ -64,9 +72,12 @@ def test_native_unpacking(tmpdir_factory, archive_file):
@pytest.mark.skipif(sys.platform == "win32", reason="Only Python unpacking available on Windows") @pytest.mark.skipif(sys.platform == "win32", reason="Only Python unpacking available on Windows")
@pytest.mark.parametrize("archive_file", ext_archive.keys(), indirect=True) @pytest.mark.parametrize(
def test_system_unpacking(tmpdir_factory, archive_file, compr_support_check): "archive_file_and_extension", [(ext, True) for ext in ext_archive.keys()], indirect=True
)
def test_system_unpacking(tmpdir_factory, archive_file_and_extension, compr_support_check):
# actually run test # actually run test
archive_file, _ = archive_file_and_extension
util = scomp.decompressor_for(archive_file) util = scomp.decompressor_for(archive_file)
tmpdir = tmpdir_factory.mktemp("system_comp_test") tmpdir = tmpdir_factory.mktemp("system_comp_test")
with working_dir(str(tmpdir)): with working_dir(str(tmpdir)):
@ -102,3 +113,21 @@ def test_get_bad_extension():
@pytest.mark.parametrize("path", ext_archive.values()) @pytest.mark.parametrize("path", ext_archive.values())
def test_allowed_archive(path): def test_allowed_archive(path):
assert scomp.allowed_archive(path) assert scomp.allowed_archive(path)
@pytest.mark.parametrize("ext_path", ext_archive.items())
def test_strip_compression_extension(ext_path):
ext, path = ext_path
stripped = scomp.strip_compression_extension(path)
if ext == "zip":
assert stripped == "Foo.zip"
stripped = scomp.strip_compression_extension(path, "zip")
assert stripped == "Foo"
elif (
ext == "tar"
or ext in scomp.CONTRACTION_MAP.keys()
or ext in [".".join(ext) for ext in product(scomp.PRE_EXTS, scomp.EXTS)]
):
assert stripped == "Foo.tar" or stripped == "Foo.TAR"
else:
assert stripped == "Foo"

View File

@ -21,6 +21,7 @@
PRE_EXTS = ["tar", "TAR"] PRE_EXTS = ["tar", "TAR"]
EXTS = ["gz", "bz2", "xz", "Z"] EXTS = ["gz", "bz2", "xz", "Z"]
NOTAR_EXTS = ["zip", "tgz", "tbz2", "tbz", "txz"] NOTAR_EXTS = ["zip", "tgz", "tbz2", "tbz", "txz"]
CONTRACTION_MAP = {"tgz": "tar.gz", "txz": "tar.xz", "tbz": "tar.bz2", "tbz2": "tar.bz2"}
# Add PRE_EXTS and EXTS last so that .tar.gz is matched *before* .tar or .gz # Add PRE_EXTS and EXTS last so that .tar.gz is matched *before* .tar or .gz
ALLOWED_ARCHIVE_TYPES = ( ALLOWED_ARCHIVE_TYPES = (
@ -77,8 +78,14 @@ def _system_untar(archive_file):
archive_file (str): absolute path to the archive to be extracted. archive_file (str): absolute path to the archive to be extracted.
Can be one of .tar(.[gz|bz2|xz|Z]) or .(tgz|tbz|tbz2|txz). Can be one of .tar(.[gz|bz2|xz|Z]) or .(tgz|tbz|tbz2|txz).
""" """
outfile = os.path.basename(strip_extension(archive_file, "tar")) archive_file_no_ext = strip_extension(archive_file)
outfile = os.path.basename(archive_file_no_ext)
if archive_file_no_ext == archive_file:
# the archive file has no extension. Tar on windows cannot untar onto itself
# archive_file can be a tar file (which causes the problem on windows) but it can
# also have other extensions (on Unix) such as tgz, tbz2, ...
archive_file = archive_file_no_ext + "-input"
shutil.move(archive_file_no_ext, archive_file)
tar = which("tar", required=True) tar = which("tar", required=True)
tar.add_default_arg("-oxf") tar.add_default_arg("-oxf")
tar(archive_file) tar(archive_file)
@ -159,7 +166,12 @@ def _py_gunzip(archive_file):
def _system_gunzip(archive_file): def _system_gunzip(archive_file):
"""Returns path to gunzip'd file """Returns path to gunzip'd file
Decompresses `.gz` compressed files via system gzip""" Decompresses `.gz` compressed files via system gzip"""
decompressed_file = os.path.basename(strip_compression_extension(archive_file, "gz")) archive_file_no_ext = strip_compression_extension(archive_file)
if archive_file_no_ext == archive_file:
# the zip file has no extension. On Unix gunzip cannot unzip onto itself
archive_file = archive_file + ".gz"
shutil.move(archive_file_no_ext, archive_file)
decompressed_file = os.path.basename(archive_file_no_ext)
working_dir = os.getcwd() working_dir = os.getcwd()
destination_abspath = os.path.join(working_dir, decompressed_file) destination_abspath = os.path.join(working_dir, decompressed_file)
compressed_file = os.path.basename(archive_file) compressed_file = os.path.basename(archive_file)
@ -233,14 +245,12 @@ def unarchive(archive_file):
# record name of new archive so we can extract # record name of new archive so we can extract
# and later clean up # and later clean up
decomped_tarball = decompressor(archive_file) decomped_tarball = decompressor(archive_file)
if check_extension(decomped_tarball, "tar"): # run tar on newly decomped archive
# run tar on newly decomped archive outfile = _system_untar(decomped_tarball)
outfile = _system_untar(decomped_tarball) # clean intermediate archive to mimic end result
# clean intermediate archive to mimic end result # produced by one shot decomp/extraction
# produced by one shot decomp/extraction os.remove(decomped_tarball)
os.remove(decomped_tarball) return outfile
return outfile
return decomped_tarball
return unarchive return unarchive
@ -248,7 +258,7 @@ def unarchive(archive_file):
def _py_lzma(archive_file): def _py_lzma(archive_file):
"""Returns path to decompressed .xz files """Returns path to decompressed .xz files
Decompress lzma compressed .xz files via python lzma module""" Decompress lzma compressed .xz files via python lzma module"""
decompressed_file = os.path.basename(strip_extension(archive_file, "xz")) decompressed_file = os.path.basename(strip_compression_extension(archive_file, "xz"))
archive_out = os.path.join(os.getcwd(), decompressed_file) archive_out = os.path.join(os.getcwd(), decompressed_file)
with open(archive_out, "wb") as ar: with open(archive_out, "wb") as ar:
with lzma.open(archive_file) as lar: with lzma.open(archive_file) as lar:
@ -707,15 +717,18 @@ def extension_from_path(path):
def strip_compression_extension(path, ext=None): def strip_compression_extension(path, ext=None):
"""Returns path with last supported or provided archive extension stripped""" """Returns path with last supported (can be combined with tar) or
path = expand_contracted_extension_in_path(path) provided archive extension stripped"""
exts_to_check = EXTS path_ext = extension_from_path(path)
if ext: if path_ext:
exts_to_check = [ext] path = expand_contracted_extension_in_path(path)
for ext_check in exts_to_check: exts_to_check = EXTS
mod_path = check_and_remove_ext(path, ext_check) if ext:
if mod_path != path: exts_to_check = [ext]
return mod_path for ext_check in exts_to_check:
mod_path = check_and_remove_ext(path, ext_check)
if mod_path != path:
return mod_path
return path return path
@ -781,8 +794,7 @@ def expand_contracted_extension(extension):
"""Return expanded version of contracted extension """Return expanded version of contracted extension
i.e. .tgz -> .tar.gz, no op on non contracted extensions""" i.e. .tgz -> .tar.gz, no op on non contracted extensions"""
extension = extension.strip(".") extension = extension.strip(".")
contraction_map = {"tgz": "tar.gz", "txz": "tar.xz", "tbz": "tar.bz2", "tbz2": "tar.bz2"} return CONTRACTION_MAP.get(extension, extension)
return contraction_map.get(extension, extension)
def compression_ext_from_compressed_archive(extension): def compression_ext_from_compressed_archive(extension):