From f124409d8a969d2c1aee06a049ae857e56e1c10b Mon Sep 17 00:00:00 2001 From: Harmen Stoppels Date: Thu, 12 Dec 2024 20:07:39 +0100 Subject: [PATCH] filter_file: fix various bugs (#48038) * `f.tell` on a `TextIOWrapper` does not return the offset in bytes, but an opaque integer that can only be used for `f.seek` on the same object. Spack assumes it's a byte offset. * Do not open in a locale dependent way, but assume utf-8 (and allow users to override that) * Use tempfile to generate a backup/temporary file in a safe way * Comparison between None and str is valid and on purpose. --- lib/spack/llnl/util/filesystem.py | 135 +++++++++++++----------------- 1 file changed, 57 insertions(+), 78 deletions(-) diff --git a/lib/spack/llnl/util/filesystem.py b/lib/spack/llnl/util/filesystem.py index 4732924572c..17a8bb203aa 100644 --- a/lib/spack/llnl/util/filesystem.py +++ b/lib/spack/llnl/util/filesystem.py @@ -301,35 +301,32 @@ def filter_file( ignore_absent: bool = False, start_at: Optional[str] = None, stop_at: Optional[str] = None, + encoding: Optional[str] = "utf-8", ) -> None: r"""Like sed, but uses python regular expressions. - Filters every line of each file through regex and replaces the file - with a filtered version. Preserves mode of filtered files. + Filters every line of each file through regex and replaces the file with a filtered version. + Preserves mode of filtered files. - As with re.sub, ``repl`` can be either a string or a callable. - If it is a callable, it is passed the match object and should - return a suitable replacement string. If it is a string, it - can contain ``\1``, ``\2``, etc. to represent back-substitution - as sed would allow. + As with re.sub, ``repl`` can be either a string or a callable. If it is a callable, it is + passed the match object and should return a suitable replacement string. If it is a string, it + can contain ``\1``, ``\2``, etc. to represent back-substitution as sed would allow. Args: - regex (str): The regular expression to search for - repl (str): The string to replace matches with - *filenames: One or more files to search and replace - string (bool): Treat regex as a plain string. Default it False - backup (bool): Make backup file(s) suffixed with ``~``. Default is False - ignore_absent (bool): Ignore any files that don't exist. - Default is False - start_at (str): Marker used to start applying the replacements. If a - text line matches this marker filtering is started at the next line. - All contents before the marker and the marker itself are copied - verbatim. Default is to start filtering from the first line of the - file. - stop_at (str): Marker used to stop scanning the file further. If a text - line matches this marker filtering is stopped and the rest of the - file is copied verbatim. Default is to filter until the end of the - file. + regex: The regular expression to search for + repl: The string to replace matches with + *filenames: One or more files to search and replace string: Treat regex as a plain string. + Default it False backup: Make backup file(s) suffixed with ``~``. Default is False + ignore_absent: Ignore any files that don't exist. Default is False + start_at: Marker used to start applying the replacements. If a text line matches this + marker filtering is started at the next line. All contents before the marker and the + marker itself are copied verbatim. Default is to start filtering from the first line of + the file. + stop_at: Marker used to stop scanning the file further. If a text line matches this marker + filtering is stopped and the rest of the file is copied verbatim. Default is to filter + until the end of the file. + encoding: The encoding to use when reading and writing the files. Default is None, which + uses the system's default encoding. """ # Allow strings to use \1, \2, etc. for replacement, like sed if not callable(repl): @@ -345,72 +342,54 @@ def groupid_to_group(x): if string: regex = re.escape(regex) - for filename in path_to_os_path(*filenames): - msg = 'FILTER FILE: {0} [replacing "{1}"]' - tty.debug(msg.format(filename, regex)) + regex_compiled = re.compile(regex) + for path in path_to_os_path(*filenames): + fd, temp_path = tempfile.mkstemp(prefix=os.path.basename(path), dir=os.path.dirname(path)) + os.close(fd) - backup_filename = filename + "~" - tmp_filename = filename + ".spack~" - - if ignore_absent and not os.path.exists(filename): - msg = 'FILTER FILE: file "{0}" not found. Skipping to next file.' - tty.debug(msg.format(filename)) + if ignore_absent and not os.path.exists(path): + tty.debug(f'FILTER FILE: file "{path}" not found. Skipping to next file.') continue + else: + tty.debug(f'FILTER FILE: {path} [replacing "{regex}"]') - # Create backup file. Don't overwrite an existing backup - # file in case this file is being filtered multiple times. - if not os.path.exists(backup_filename): - shutil.copy(filename, backup_filename) - - # Create a temporary file to read from. We cannot use backup_filename - # in case filter_file is invoked multiple times on the same file. - shutil.copy(filename, tmp_filename) + shutil.copy(path, temp_path) + errored = False try: - # Open as a text file and filter until the end of the file is - # reached, or we found a marker in the line if it was specified - # - # To avoid translating line endings (\n to \r\n and vice-versa) - # we force os.open to ignore translations and use the line endings - # the file comes with - with open(tmp_filename, mode="r", errors="surrogateescape", newline="") as input_file: - with open(filename, mode="w", errors="surrogateescape", newline="") as output_file: - do_filtering = start_at is None - # Using iter and readline is a workaround needed not to - # disable input_file.tell(), which will happen if we call - # input_file.next() implicitly via the for loop - for line in iter(input_file.readline, ""): - if stop_at is not None: - current_position = input_file.tell() + # Open as a text file and filter until the end of the file is reached, or we found a + # marker in the line if it was specified. To avoid translating line endings (\n to + # \r\n and vice-versa) use newline="". + with open( + temp_path, mode="r", errors="surrogateescape", newline="", encoding=encoding + ) as input_file, open( + path, mode="w", errors="surrogateescape", newline="", encoding=encoding + ) as output_file: + if start_at is None and stop_at is None: # common case, avoids branching in loop + for line in input_file: + output_file.write(re.sub(regex_compiled, repl, line)) + else: + # state is -1 before start_at; 0 between; 1 after stop_at + state = 0 if start_at is None else -1 + for line in input_file: + if state == 0: if stop_at == line.strip(): - output_file.write(line) - break - if do_filtering: - filtered_line = re.sub(regex, repl, line) - output_file.write(filtered_line) - else: - do_filtering = start_at == line.strip() - output_file.write(line) - else: - current_position = None - - # If we stopped filtering at some point, reopen the file in - # binary mode and copy verbatim the remaining part - if current_position and stop_at: - with open(tmp_filename, mode="rb") as input_binary_buffer: - input_binary_buffer.seek(current_position) - with open(filename, mode="ab") as output_binary_buffer: - output_binary_buffer.writelines(input_binary_buffer.readlines()) + state = 1 + else: + line = re.sub(regex_compiled, repl, line) + elif state == -1 and start_at == line.strip(): + state = 0 + output_file.write(line) except BaseException: - # clean up the original file on failure. - shutil.move(backup_filename, filename) + # restore the original file + os.rename(temp_path, path) + errored = True raise finally: - os.remove(tmp_filename) - if not backup and os.path.exists(backup_filename): - os.remove(backup_filename) + if not errored and not backup: + os.unlink(temp_path) class FileFilter: