filter_file: fix various bugs (#48038)

* `f.tell` on a `TextIOWrapper` does not return the offset in bytes, but
  an opaque integer that can only be used for `f.seek` on the same
  object. Spack assumes it's a byte offset.
* Do not open in a locale dependent way, but assume utf-8 (and allow
  users to override that)
* Use tempfile to generate a backup/temporary file in a safe way
* Comparison between None and str is valid and on purpose.
This commit is contained in:
Harmen Stoppels 2024-12-12 20:07:39 +01:00 committed by Harmen Stoppels
parent bee2132c04
commit f124409d8a

View File

@ -301,35 +301,32 @@ def filter_file(
ignore_absent: bool = False,
start_at: Optional[str] = None,
stop_at: Optional[str] = None,
encoding: Optional[str] = "utf-8",
) -> None:
r"""Like sed, but uses python regular expressions.
Filters every line of each file through regex and replaces the file
with a filtered version. Preserves mode of filtered files.
Filters every line of each file through regex and replaces the file with a filtered version.
Preserves mode of filtered files.
As with re.sub, ``repl`` can be either a string or a callable.
If it is a callable, it is passed the match object and should
return a suitable replacement string. If it is a string, it
can contain ``\1``, ``\2``, etc. to represent back-substitution
as sed would allow.
As with re.sub, ``repl`` can be either a string or a callable. If it is a callable, it is
passed the match object and should return a suitable replacement string. If it is a string, it
can contain ``\1``, ``\2``, etc. to represent back-substitution as sed would allow.
Args:
regex (str): The regular expression to search for
repl (str): The string to replace matches with
*filenames: One or more files to search and replace
string (bool): Treat regex as a plain string. Default it False
backup (bool): Make backup file(s) suffixed with ``~``. Default is False
ignore_absent (bool): Ignore any files that don't exist.
Default is False
start_at (str): Marker used to start applying the replacements. If a
text line matches this marker filtering is started at the next line.
All contents before the marker and the marker itself are copied
verbatim. Default is to start filtering from the first line of the
file.
stop_at (str): Marker used to stop scanning the file further. If a text
line matches this marker filtering is stopped and the rest of the
file is copied verbatim. Default is to filter until the end of the
file.
regex: The regular expression to search for
repl: The string to replace matches with
*filenames: One or more files to search and replace string: Treat regex as a plain string.
Default it False backup: Make backup file(s) suffixed with ``~``. Default is False
ignore_absent: Ignore any files that don't exist. Default is False
start_at: Marker used to start applying the replacements. If a text line matches this
marker filtering is started at the next line. All contents before the marker and the
marker itself are copied verbatim. Default is to start filtering from the first line of
the file.
stop_at: Marker used to stop scanning the file further. If a text line matches this marker
filtering is stopped and the rest of the file is copied verbatim. Default is to filter
until the end of the file.
encoding: The encoding to use when reading and writing the files. Default is None, which
uses the system's default encoding.
"""
# Allow strings to use \1, \2, etc. for replacement, like sed
if not callable(repl):
@ -345,72 +342,54 @@ def groupid_to_group(x):
if string:
regex = re.escape(regex)
for filename in path_to_os_path(*filenames):
msg = 'FILTER FILE: {0} [replacing "{1}"]'
tty.debug(msg.format(filename, regex))
regex_compiled = re.compile(regex)
for path in path_to_os_path(*filenames):
fd, temp_path = tempfile.mkstemp(prefix=os.path.basename(path), dir=os.path.dirname(path))
os.close(fd)
backup_filename = filename + "~"
tmp_filename = filename + ".spack~"
if ignore_absent and not os.path.exists(filename):
msg = 'FILTER FILE: file "{0}" not found. Skipping to next file.'
tty.debug(msg.format(filename))
if ignore_absent and not os.path.exists(path):
tty.debug(f'FILTER FILE: file "{path}" not found. Skipping to next file.')
continue
else:
tty.debug(f'FILTER FILE: {path} [replacing "{regex}"]')
# Create backup file. Don't overwrite an existing backup
# file in case this file is being filtered multiple times.
if not os.path.exists(backup_filename):
shutil.copy(filename, backup_filename)
# Create a temporary file to read from. We cannot use backup_filename
# in case filter_file is invoked multiple times on the same file.
shutil.copy(filename, tmp_filename)
shutil.copy(path, temp_path)
errored = False
try:
# Open as a text file and filter until the end of the file is
# reached, or we found a marker in the line if it was specified
#
# To avoid translating line endings (\n to \r\n and vice-versa)
# we force os.open to ignore translations and use the line endings
# the file comes with
with open(tmp_filename, mode="r", errors="surrogateescape", newline="") as input_file:
with open(filename, mode="w", errors="surrogateescape", newline="") as output_file:
do_filtering = start_at is None
# Using iter and readline is a workaround needed not to
# disable input_file.tell(), which will happen if we call
# input_file.next() implicitly via the for loop
for line in iter(input_file.readline, ""):
if stop_at is not None:
current_position = input_file.tell()
# Open as a text file and filter until the end of the file is reached, or we found a
# marker in the line if it was specified. To avoid translating line endings (\n to
# \r\n and vice-versa) use newline="".
with open(
temp_path, mode="r", errors="surrogateescape", newline="", encoding=encoding
) as input_file, open(
path, mode="w", errors="surrogateescape", newline="", encoding=encoding
) as output_file:
if start_at is None and stop_at is None: # common case, avoids branching in loop
for line in input_file:
output_file.write(re.sub(regex_compiled, repl, line))
else:
# state is -1 before start_at; 0 between; 1 after stop_at
state = 0 if start_at is None else -1
for line in input_file:
if state == 0:
if stop_at == line.strip():
output_file.write(line)
break
if do_filtering:
filtered_line = re.sub(regex, repl, line)
output_file.write(filtered_line)
state = 1
else:
do_filtering = start_at == line.strip()
line = re.sub(regex_compiled, repl, line)
elif state == -1 and start_at == line.strip():
state = 0
output_file.write(line)
else:
current_position = None
# If we stopped filtering at some point, reopen the file in
# binary mode and copy verbatim the remaining part
if current_position and stop_at:
with open(tmp_filename, mode="rb") as input_binary_buffer:
input_binary_buffer.seek(current_position)
with open(filename, mode="ab") as output_binary_buffer:
output_binary_buffer.writelines(input_binary_buffer.readlines())
except BaseException:
# clean up the original file on failure.
shutil.move(backup_filename, filename)
# restore the original file
os.rename(temp_path, path)
errored = True
raise
finally:
os.remove(tmp_filename)
if not backup and os.path.exists(backup_filename):
os.remove(backup_filename)
if not errored and not backup:
os.unlink(temp_path)
class FileFilter: