avoid double closing of fd in sub-processes (#47035)
Both `multiprocessing.connection.Connection.__del__` and `io.IOBase.__del__` called `os.close` on the same file descriptor. As of Python 3.13, this is an explicit warning. Ensure we close once by usef `os.fdopen(..., closefd=False)`
This commit is contained in:
parent
a07d42d35b
commit
275d1d88f4
@ -348,7 +348,19 @@ def close(self):
|
|||||||
class MultiProcessFd:
|
class MultiProcessFd:
|
||||||
"""Return an object which stores a file descriptor and can be passed as an
|
"""Return an object which stores a file descriptor and can be passed as an
|
||||||
argument to a function run with ``multiprocessing.Process``, such that
|
argument to a function run with ``multiprocessing.Process``, such that
|
||||||
the file descriptor is available in the subprocess."""
|
the file descriptor is available in the subprocess. It provides access via
|
||||||
|
the `fd` property.
|
||||||
|
|
||||||
|
This object takes control over the associated FD: files opened from this
|
||||||
|
using `fdopen` need to use `closefd=False`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# As for why you have to fdopen(..., closefd=False): when a
|
||||||
|
# multiprocessing.connection.Connection object stores an fd, it assumes
|
||||||
|
# control over it, and will attempt to close it when gc'ed during __del__;
|
||||||
|
# if you fdopen(multiprocessfd.fd, closefd=True) then the resulting file
|
||||||
|
# will also assume control, and you can see warnings when there is an
|
||||||
|
# attempted double close.
|
||||||
|
|
||||||
def __init__(self, fd):
|
def __init__(self, fd):
|
||||||
self._connection = None
|
self._connection = None
|
||||||
@ -361,33 +373,20 @@ def __init__(self, fd):
|
|||||||
@property
|
@property
|
||||||
def fd(self):
|
def fd(self):
|
||||||
if self._connection:
|
if self._connection:
|
||||||
return self._connection._handle
|
return self._connection.fileno()
|
||||||
else:
|
else:
|
||||||
return self._fd
|
return self._fd
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
|
"""Rather than `.close()`ing any file opened from the associated
|
||||||
|
`.fd`, the `MultiProcessFd` should be closed with this.
|
||||||
|
"""
|
||||||
if self._connection:
|
if self._connection:
|
||||||
self._connection.close()
|
self._connection.close()
|
||||||
else:
|
else:
|
||||||
os.close(self._fd)
|
os.close(self._fd)
|
||||||
|
|
||||||
|
|
||||||
def close_connection_and_file(multiprocess_fd, file):
|
|
||||||
# MultiprocessFd is intended to transmit a FD
|
|
||||||
# to a child process, this FD is then opened to a Python File object
|
|
||||||
# (using fdopen). In >= 3.8, MultiprocessFd encapsulates a
|
|
||||||
# multiprocessing.connection.Connection; Connection closes the FD
|
|
||||||
# when it is deleted, and prints a warning about duplicate closure if
|
|
||||||
# it is not explicitly closed. In < 3.8, MultiprocessFd encapsulates a
|
|
||||||
# simple FD; closing the FD here appears to conflict with
|
|
||||||
# closure of the File object (in < 3.8 that is). Therefore this needs
|
|
||||||
# to choose whether to close the File or the Connection.
|
|
||||||
if sys.version_info >= (3, 8):
|
|
||||||
multiprocess_fd.close()
|
|
||||||
else:
|
|
||||||
file.close()
|
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def replace_environment(env):
|
def replace_environment(env):
|
||||||
"""Replace the current environment (`os.environ`) with `env`.
|
"""Replace the current environment (`os.environ`) with `env`.
|
||||||
@ -932,10 +931,10 @@ def _writer_daemon(
|
|||||||
# 1. Use line buffering (3rd param = 1) since Python 3 has a bug
|
# 1. Use line buffering (3rd param = 1) since Python 3 has a bug
|
||||||
# that prevents unbuffered text I/O.
|
# that prevents unbuffered text I/O.
|
||||||
# 2. Python 3.x before 3.7 does not open with UTF-8 encoding by default
|
# 2. Python 3.x before 3.7 does not open with UTF-8 encoding by default
|
||||||
in_pipe = os.fdopen(read_multiprocess_fd.fd, "r", 1, encoding="utf-8")
|
in_pipe = os.fdopen(read_multiprocess_fd.fd, "r", 1, encoding="utf-8", closefd=False)
|
||||||
|
|
||||||
if stdin_multiprocess_fd:
|
if stdin_multiprocess_fd:
|
||||||
stdin = os.fdopen(stdin_multiprocess_fd.fd)
|
stdin = os.fdopen(stdin_multiprocess_fd.fd, closefd=False)
|
||||||
else:
|
else:
|
||||||
stdin = None
|
stdin = None
|
||||||
|
|
||||||
@ -1025,9 +1024,9 @@ def _writer_daemon(
|
|||||||
if isinstance(log_file, io.StringIO):
|
if isinstance(log_file, io.StringIO):
|
||||||
control_pipe.send(log_file.getvalue())
|
control_pipe.send(log_file.getvalue())
|
||||||
log_file_wrapper.close()
|
log_file_wrapper.close()
|
||||||
close_connection_and_file(read_multiprocess_fd, in_pipe)
|
read_multiprocess_fd.close()
|
||||||
if stdin_multiprocess_fd:
|
if stdin_multiprocess_fd:
|
||||||
close_connection_and_file(stdin_multiprocess_fd, stdin)
|
stdin_multiprocess_fd.close()
|
||||||
|
|
||||||
# send echo value back to the parent so it can be preserved.
|
# send echo value back to the parent so it can be preserved.
|
||||||
control_pipe.send(echo)
|
control_pipe.send(echo)
|
||||||
|
@ -1194,7 +1194,7 @@ def _setup_pkg_and_run(
|
|||||||
# that the parent process is not going to read from it till we
|
# that the parent process is not going to read from it till we
|
||||||
# are done with the child, so we undo Python's precaution.
|
# are done with the child, so we undo Python's precaution.
|
||||||
if input_multiprocess_fd is not None:
|
if input_multiprocess_fd is not None:
|
||||||
sys.stdin = os.fdopen(input_multiprocess_fd.fd)
|
sys.stdin = os.fdopen(input_multiprocess_fd.fd, closefd=False)
|
||||||
|
|
||||||
pkg = serialized_pkg.restore()
|
pkg = serialized_pkg.restore()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user