Replace MultiProcessFd with Connection objects

Connection objects are Python version, platform and multiprocessing
start method independent, so better to use those than a mix of plain
file descriptors and inadequate guesses in the child process whether it
was forked or not.

This also allows us to delete the now redundant MultiProcessFd class,
hopefully making things a bit easier to follow.
This commit is contained in:
Harmen Stoppels 2024-10-23 15:06:13 +02:00 committed by Todd Gamblin
parent ef220daaca
commit b63cbe4e6e
2 changed files with 68 additions and 134 deletions

View File

@ -10,7 +10,6 @@
import errno import errno
import io import io
import multiprocessing import multiprocessing
import multiprocessing.connection
import os import os
import re import re
import select import select
@ -19,9 +18,10 @@
import threading import threading
import traceback import traceback
from contextlib import contextmanager from contextlib import contextmanager
from multiprocessing.connection import Connection
from threading import Thread from threading import Thread
from types import ModuleType from types import ModuleType
from typing import Optional from typing import Callable, Optional
import llnl.util.tty as tty import llnl.util.tty as tty
@ -345,48 +345,6 @@ def close(self):
self.file.close() self.file.close()
class MultiProcessFd:
"""Return an object which stores a file descriptor and can be passed as an
argument to a function run with ``multiprocessing.Process``, such that
the file descriptor is available in the subprocess. It provides access via
the `fd` property.
This object takes control over the associated FD: files opened from this
using `fdopen` need to use `closefd=False`.
"""
# As for why you have to fdopen(..., closefd=False): when a
# multiprocessing.connection.Connection object stores an fd, it assumes
# control over it, and will attempt to close it when gc'ed during __del__;
# if you fdopen(multiprocessfd.fd, closefd=True) then the resulting file
# will also assume control, and you can see warnings when there is an
# attempted double close.
def __init__(self, fd):
self._connection = None
self._fd = None
if sys.version_info >= (3, 8):
self._connection = multiprocessing.connection.Connection(fd)
else:
self._fd = fd
@property
def fd(self):
if self._connection:
return self._connection.fileno()
else:
return self._fd
def close(self):
"""Rather than `.close()`ing any file opened from the associated
`.fd`, the `MultiProcessFd` should be closed with this.
"""
if self._connection:
self._connection.close()
else:
os.close(self._fd)
@contextmanager @contextmanager
def replace_environment(env): def replace_environment(env):
"""Replace the current environment (`os.environ`) with `env`. """Replace the current environment (`os.environ`) with `env`.
@ -545,9 +503,7 @@ def __enter__(self):
self._saved_debug = tty._debug self._saved_debug = tty._debug
# OS-level pipe for redirecting output to logger # OS-level pipe for redirecting output to logger
read_fd, write_fd = os.pipe() read_fd, write_fd = multiprocessing.Pipe(duplex=False)
read_multiprocess_fd = MultiProcessFd(read_fd)
# Multiprocessing pipe for communication back from the daemon # Multiprocessing pipe for communication back from the daemon
# Currently only used to save echo value between uses # Currently only used to save echo value between uses
@ -556,10 +512,10 @@ def __enter__(self):
# Sets a daemon that writes to file what it reads from a pipe # Sets a daemon that writes to file what it reads from a pipe
try: try:
# need to pass this b/c multiprocessing closes stdin in child. # need to pass this b/c multiprocessing closes stdin in child.
input_multiprocess_fd = None input_fd = None
try: try:
if sys.stdin.isatty(): if sys.stdin.isatty():
input_multiprocess_fd = MultiProcessFd(os.dup(sys.stdin.fileno())) input_fd = Connection(os.dup(sys.stdin.fileno()))
except BaseException: except BaseException:
# just don't forward input if this fails # just don't forward input if this fails
pass pass
@ -568,8 +524,8 @@ def __enter__(self):
self.process = multiprocessing.Process( self.process = multiprocessing.Process(
target=_writer_daemon, target=_writer_daemon,
args=( args=(
input_multiprocess_fd, input_fd,
read_multiprocess_fd, read_fd,
write_fd, write_fd,
self.echo, self.echo,
self.log_file, self.log_file,
@ -581,9 +537,9 @@ def __enter__(self):
self.process.start() self.process.start()
finally: finally:
if input_multiprocess_fd: if input_fd:
input_multiprocess_fd.close() input_fd.close()
read_multiprocess_fd.close() read_fd.close()
# Flush immediately before redirecting so that anything buffered # Flush immediately before redirecting so that anything buffered
# goes to the original stream # goes to the original stream
@ -601,9 +557,9 @@ def __enter__(self):
self._saved_stderr = os.dup(sys.stderr.fileno()) self._saved_stderr = os.dup(sys.stderr.fileno())
# redirect to the pipe we created above # redirect to the pipe we created above
os.dup2(write_fd, sys.stdout.fileno()) os.dup2(write_fd.fileno(), sys.stdout.fileno())
os.dup2(write_fd, sys.stderr.fileno()) os.dup2(write_fd.fileno(), sys.stderr.fileno())
os.close(write_fd) write_fd.close()
else: else:
# Handle I/O the Python way. This won't redirect lower-level # Handle I/O the Python way. This won't redirect lower-level
@ -616,7 +572,7 @@ def __enter__(self):
self._saved_stderr = sys.stderr self._saved_stderr = sys.stderr
# create a file object for the pipe; redirect to it. # create a file object for the pipe; redirect to it.
pipe_fd_out = os.fdopen(write_fd, "w") pipe_fd_out = os.fdopen(write_fd.fileno(), "w", closefd=False)
sys.stdout = pipe_fd_out sys.stdout = pipe_fd_out
sys.stderr = pipe_fd_out sys.stderr = pipe_fd_out
@ -865,14 +821,14 @@ def force_echo(self):
def _writer_daemon( def _writer_daemon(
stdin_multiprocess_fd, stdin_fd: Optional[Connection],
read_multiprocess_fd, read_fd: Connection,
write_fd, write_fd: Connection,
echo, echo: bool,
log_file_wrapper, log_file_wrapper: FileWrapper,
control_pipe, control_fd: Connection,
filter_fn, filter_fn: Optional[Callable[[str], str]],
): ) -> None:
"""Daemon used by ``log_output`` to write to a log file and to ``stdout``. """Daemon used by ``log_output`` to write to a log file and to ``stdout``.
The daemon receives output from the parent process and writes it both The daemon receives output from the parent process and writes it both
@ -909,43 +865,37 @@ def _writer_daemon(
``StringIO`` in the parent. This is mainly for testing. ``StringIO`` in the parent. This is mainly for testing.
Arguments: Arguments:
stdin_multiprocess_fd (int): input from the terminal stdin_fd: optional input from the terminal
read_multiprocess_fd (int): pipe for reading from parent's redirected read_fd: pipe for reading from parent's redirected stdout
stdout echo: initial echo setting -- controlled by user and preserved across multiple writer
echo (bool): initial echo setting -- controlled by user and daemons
preserved across multiple writer daemons log_file_wrapper: file to log all output
log_file_wrapper (FileWrapper): file to log all output control_pipe: multiprocessing pipe on which to send control information to the parent
control_pipe (Pipe): multiprocessing pipe on which to send control filter_fn: optional function to filter each line of output
information to the parent
filter_fn (callable, optional): function to filter each line of output
""" """
# If this process was forked, then it will inherit file descriptors from # This process depends on closing all instances of write_pipe to terminate the reading loop
# the parent process. This process depends on closing all instances of write_fd.close()
# write_fd to terminate the reading loop, so we close the file descriptor
# here. Forking is the process spawning method everywhere except Mac OS
# for Python >= 3.8 and on Windows
if sys.version_info < (3, 8) or sys.platform != "darwin":
os.close(write_fd)
# 1. Use line buffering (3rd param = 1) since Python 3 has a bug # 1. Use line buffering (3rd param = 1) since Python 3 has a bug
# that prevents unbuffered text I/O. # that prevents unbuffered text I/O.
# 2. Python 3.x before 3.7 does not open with UTF-8 encoding by default # 2. Python 3.x before 3.7 does not open with UTF-8 encoding by default
in_pipe = os.fdopen(read_multiprocess_fd.fd, "r", 1, encoding="utf-8", closefd=False) # 3. closefd=False because Connection has "ownership"
read_file = os.fdopen(read_fd.fileno(), "r", 1, encoding="utf-8", closefd=False)
if stdin_multiprocess_fd: if stdin_fd:
stdin = os.fdopen(stdin_multiprocess_fd.fd, closefd=False) stdin_file = os.fdopen(stdin_fd.fileno(), closefd=False)
else: else:
stdin = None stdin_file = None
# list of streams to select from # list of streams to select from
istreams = [in_pipe, stdin] if stdin else [in_pipe] istreams = [read_file, stdin_file] if stdin_file else [read_file]
force_echo = False # parent can force echo for certain output force_echo = False # parent can force echo for certain output
log_file = log_file_wrapper.unwrap() log_file = log_file_wrapper.unwrap()
try: try:
with keyboard_input(stdin) as kb: with keyboard_input(stdin_file) as kb:
while True: while True:
# fix the terminal settings if we recently came to # fix the terminal settings if we recently came to
# the foreground # the foreground
@ -958,12 +908,12 @@ def _writer_daemon(
# Allow user to toggle echo with 'v' key. # Allow user to toggle echo with 'v' key.
# Currently ignores other chars. # Currently ignores other chars.
# only read stdin if we're in the foreground # only read stdin if we're in the foreground
if stdin in rlist and not _is_background_tty(stdin): if stdin_file and stdin_file in rlist and not _is_background_tty(stdin_file):
# it's possible to be backgrounded between the above # it's possible to be backgrounded between the above
# check and the read, so we ignore SIGTTIN here. # check and the read, so we ignore SIGTTIN here.
with ignore_signal(signal.SIGTTIN): with ignore_signal(signal.SIGTTIN):
try: try:
if stdin.read(1) == "v": if stdin_file.read(1) == "v":
echo = not echo echo = not echo
except IOError as e: except IOError as e:
# If SIGTTIN is ignored, the system gives EIO # If SIGTTIN is ignored, the system gives EIO
@ -972,13 +922,13 @@ def _writer_daemon(
if e.errno != errno.EIO: if e.errno != errno.EIO:
raise raise
if in_pipe in rlist: if read_file in rlist:
line_count = 0 line_count = 0
try: try:
while line_count < 100: while line_count < 100:
# Handle output from the calling process. # Handle output from the calling process.
try: try:
line = _retry(in_pipe.readline)() line = _retry(read_file.readline)()
except UnicodeDecodeError: except UnicodeDecodeError:
# installs like --test=root gpgme produce non-UTF8 logs # installs like --test=root gpgme produce non-UTF8 logs
line = "<line lost: output was not encoded as UTF-8>\n" line = "<line lost: output was not encoded as UTF-8>\n"
@ -1007,7 +957,7 @@ def _writer_daemon(
if xoff in controls: if xoff in controls:
force_echo = False force_echo = False
if not _input_available(in_pipe): if not _input_available(read_file):
break break
finally: finally:
if line_count > 0: if line_count > 0:
@ -1022,14 +972,14 @@ def _writer_daemon(
finally: finally:
# send written data back to parent if we used a StringIO # send written data back to parent if we used a StringIO
if isinstance(log_file, io.StringIO): if isinstance(log_file, io.StringIO):
control_pipe.send(log_file.getvalue()) control_fd.send(log_file.getvalue())
log_file_wrapper.close() log_file_wrapper.close()
read_multiprocess_fd.close() read_fd.close()
if stdin_multiprocess_fd: if stdin_fd:
stdin_multiprocess_fd.close() stdin_fd.close()
# send echo value back to the parent so it can be preserved. # send echo value back to the parent so it can be preserved.
control_pipe.send(echo) control_fd.send(echo)
def _retry(function): def _retry(function):

View File

@ -44,6 +44,7 @@
from collections import defaultdict from collections import defaultdict
from enum import Flag, auto from enum import Flag, auto
from itertools import chain from itertools import chain
from multiprocessing.connection import Connection
from typing import Callable, Dict, List, Optional, Set, Tuple from typing import Callable, Dict, List, Optional, Set, Tuple
import archspec.cpu import archspec.cpu
@ -54,7 +55,6 @@
from llnl.util.lang import dedupe, stable_partition from llnl.util.lang import dedupe, stable_partition
from llnl.util.symlink import symlink from llnl.util.symlink import symlink
from llnl.util.tty.color import cescape, colorize from llnl.util.tty.color import cescape, colorize
from llnl.util.tty.log import MultiProcessFd
import spack.build_systems._checks import spack.build_systems._checks
import spack.build_systems.cmake import spack.build_systems.cmake
@ -1143,10 +1143,10 @@ def _setup_pkg_and_run(
serialized_pkg: "spack.subprocess_context.PackageInstallContext", serialized_pkg: "spack.subprocess_context.PackageInstallContext",
function: Callable, function: Callable,
kwargs: Dict, kwargs: Dict,
write_pipe: multiprocessing.connection.Connection, write_pipe: Connection,
input_multiprocess_fd: Optional[MultiProcessFd], input_pipe: Optional[Connection],
jsfd1: Optional[MultiProcessFd], jsfd1: Optional[Connection],
jsfd2: Optional[MultiProcessFd], jsfd2: Optional[Connection],
): ):
"""Main entry point in the child process for Spack builds. """Main entry point in the child process for Spack builds.
@ -1188,13 +1188,12 @@ def _setup_pkg_and_run(
context: str = kwargs.get("context", "build") context: str = kwargs.get("context", "build")
try: try:
# We are in the child process. Python sets sys.stdin to # We are in the child process. Python sets sys.stdin to open(os.devnull) to prevent our
# open(os.devnull) to prevent our process and its parent from # process and its parent from simultaneously reading from the original stdin. But, we
# simultaneously reading from the original stdin. But, we assume # assume that the parent process is not going to read from it till we are done with the
# that the parent process is not going to read from it till we # child, so we undo Python's precaution. closefd=False since Connection has ownership.
# are done with the child, so we undo Python's precaution. if input_pipe is not None:
if input_multiprocess_fd is not None: sys.stdin = os.fdopen(input_pipe.fileno(), closefd=False)
sys.stdin = os.fdopen(input_multiprocess_fd.fd, closefd=False)
pkg = serialized_pkg.restore() pkg = serialized_pkg.restore()
@ -1263,8 +1262,8 @@ def _setup_pkg_and_run(
finally: finally:
write_pipe.close() write_pipe.close()
if input_multiprocess_fd is not None: if input_pipe is not None:
input_multiprocess_fd.close() input_pipe.close()
def start_build_process(pkg, function, kwargs): def start_build_process(pkg, function, kwargs):
@ -1291,23 +1290,9 @@ def child_fun():
If something goes wrong, the child process catches the error and If something goes wrong, the child process catches the error and
passes it to the parent wrapped in a ChildError. The parent is passes it to the parent wrapped in a ChildError. The parent is
expected to handle (or re-raise) the ChildError. expected to handle (or re-raise) the ChildError.
This uses `multiprocessing.Process` to create the child process. The
mechanism used to create the process differs on different operating
systems and for different versions of Python. In some cases "fork"
is used (i.e. the "fork" system call) and some cases it starts an
entirely new Python interpreter process (in the docs this is referred
to as the "spawn" start method). Breaking it down by OS:
- Linux always uses fork.
- Mac OS uses fork before Python 3.8 and "spawn" for 3.8 and after.
- Windows always uses the "spawn" start method.
For more information on `multiprocessing` child process creation
mechanisms, see https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
""" """
read_pipe, write_pipe = multiprocessing.Pipe(duplex=False) read_pipe, write_pipe = multiprocessing.Pipe(duplex=False)
input_multiprocess_fd = None input_fd = None
jobserver_fd1 = None jobserver_fd1 = None
jobserver_fd2 = None jobserver_fd2 = None
@ -1316,14 +1301,13 @@ def child_fun():
try: try:
# Forward sys.stdin when appropriate, to allow toggling verbosity # Forward sys.stdin when appropriate, to allow toggling verbosity
if sys.platform != "win32" and sys.stdin.isatty() and hasattr(sys.stdin, "fileno"): if sys.platform != "win32" and sys.stdin.isatty() and hasattr(sys.stdin, "fileno"):
input_fd = os.dup(sys.stdin.fileno()) input_fd = Connection(os.dup(sys.stdin.fileno()))
input_multiprocess_fd = MultiProcessFd(input_fd)
mflags = os.environ.get("MAKEFLAGS", False) mflags = os.environ.get("MAKEFLAGS", False)
if mflags: if mflags:
m = re.search(r"--jobserver-[^=]*=(\d),(\d)", mflags) m = re.search(r"--jobserver-[^=]*=(\d),(\d)", mflags)
if m: if m:
jobserver_fd1 = MultiProcessFd(int(m.group(1))) jobserver_fd1 = Connection(int(m.group(1)))
jobserver_fd2 = MultiProcessFd(int(m.group(2))) jobserver_fd2 = Connection(int(m.group(2)))
p = multiprocessing.Process( p = multiprocessing.Process(
target=_setup_pkg_and_run, target=_setup_pkg_and_run,
@ -1332,7 +1316,7 @@ def child_fun():
function, function,
kwargs, kwargs,
write_pipe, write_pipe,
input_multiprocess_fd, input_fd,
jobserver_fd1, jobserver_fd1,
jobserver_fd2, jobserver_fd2,
), ),
@ -1352,8 +1336,8 @@ def child_fun():
finally: finally:
# Close the input stream in the parent process # Close the input stream in the parent process
if input_multiprocess_fd is not None: if input_fd is not None:
input_multiprocess_fd.close() input_fd.close()
def exitcode_msg(p): def exitcode_msg(p):
typ = "exit" if p.exitcode >= 0 else "signal" typ = "exit" if p.exitcode >= 0 else "signal"