spack install terminal output handling in foreground/background (#15723)

Makes the following changes:

* (Fixes #15620) tty configuration was failing when stdout was 
  redirected. The implementation now creates a pseudo terminal for
  stdin and checks stdout properly, so redirections of stdin/out/err
  should be handled now.
* Handles terminal configuration when the Spack process moves between
  the foreground and background (possibly multiple times) during a
  build.
* Spack adjusts terminal settings to allow users to to enable/disable
  build process output to the terminal using a "v" toggle, abnormal
  exit cases (like CTRL-C) could leave the terminal in an unusable
  state. This is addressed here with a special-case handler which
  restores terminal settings.

Significantly extend testing of process output logger:

* New PseudoShell object for setting up a master and child process
  and configuring file descriptor inheritance between the two
* Tests for "v" verbosity toggle making use of the added PseudoShell
  object
* Added `uniq` function which takes a list of elements and replaces
  any consecutive sequence of duplicate elements with a single
  instance (e.g. "112211" -> "121")

Co-authored-by: Adam J. Stewart <ajstewart426@gmail.com>
This commit is contained in:
Todd Gamblin 2020-04-15 11:05:41 -07:00 committed by GitHub
parent 94aa368af8
commit a563884af3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1153 additions and 222 deletions

View File

@ -624,9 +624,9 @@ def replace_directory_transaction(directory_name, tmp_root=None):
# Check the input is indeed a directory with absolute path.
# Raise before anything is done to avoid moving the wrong directory
assert os.path.isdir(directory_name), \
'"directory_name" must be a valid directory'
'Invalid directory: ' + directory_name
assert os.path.isabs(directory_name), \
'"directory_name" must contain an absolute path'
'"directory_name" must contain an absolute path: ' + directory_name
directory_basename = os.path.basename(directory_name)

View File

@ -619,3 +619,28 @@ def load_module_from_file(module_name, module_path):
import imp
module = imp.load_source(module_name, module_path)
return module
def uniq(sequence):
"""Remove strings of duplicate elements from a list.
This works like the command-line ``uniq`` tool. It filters strings
of duplicate elements in a list. Adjacent matching elements are
merged into the first occurrence.
For example::
uniq([1, 1, 1, 1, 2, 2, 2, 3, 3]) == [1, 2, 3]
uniq([1, 1, 1, 1, 2, 2, 2, 1, 1]) == [1, 2, 1]
"""
if not sequence:
return []
uniq_list = [sequence[0]]
last = sequence[0]
for element in sequence[1:]:
if element != last:
uniq_list.append(element)
last = element
return uniq_list

View File

@ -7,6 +7,8 @@
"""
from __future__ import unicode_literals
import atexit
import errno
import multiprocessing
import os
import re
@ -25,6 +27,7 @@
except ImportError:
termios = None
# Use this to strip escape sequences
_escape = re.compile(r'\x1b[^m]*m|\x1b\[?1034h')
@ -38,17 +41,22 @@
@contextmanager
def background_safe():
signal.signal(signal.SIGTTOU, signal.SIG_IGN)
yield
signal.signal(signal.SIGTTOU, signal.SIG_DFL)
def ignore_signal(signum):
"""Context manager to temporarily ignore a signal."""
old_handler = signal.signal(signum, signal.SIG_IGN)
try:
yield
finally:
signal.signal(signum, old_handler)
def _is_background_tty():
"""Return True iff this process is backgrounded and stdout is a tty"""
if sys.stdout.isatty():
return os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno())
return False # not writing to tty, not background
def _is_background_tty(stream):
"""True if the stream is a tty and calling process is in the background.
"""
return (
stream.isatty() and
os.getpgrp() != os.tcgetpgrp(stream.fileno())
)
def _strip(line):
@ -56,27 +64,80 @@ def _strip(line):
return _escape.sub('', line)
class _keyboard_input(object):
class keyboard_input(object):
"""Context manager to disable line editing and echoing.
Use this with ``sys.stdin`` for keyboard input, e.g.::
with keyboard_input(sys.stdin):
r, w, x = select.select([sys.stdin], [], [])
# ... do something with keypresses ...
with keyboard_input(sys.stdin) as kb:
while True:
kb.check_fg_bg()
r, w, x = select.select([sys.stdin], [], [])
# ... do something with keypresses ...
This disables canonical input so that keypresses are available on the
stream immediately. Typically standard input allows line editing,
which means keypresses won't be sent until the user hits return.
The ``keyboard_input`` context manager disables canonical
(line-based) input and echoing, so that keypresses are available on
the stream immediately, and they are not printed to the
terminal. Typically, standard input is line-buffered, which means
keypresses won't be sent until the user hits return. In this mode, a
user can hit, e.g., 'v', and it will be read on the other end of the
pipe immediately but not printed.
It also disables echoing, so that keys pressed aren't printed to the
terminal. So, the user can hit, e.g., 'v', and it's read on the
other end of the pipe immediately but not printed.
The handler takes care to ensure that terminal changes only take
effect when the calling process is in the foreground. If the process
is backgrounded, canonical mode and echo are re-enabled. They are
disabled again when the calling process comes back to the foreground.
When the with block completes, prior TTY settings are restored.
This context manager works through a single signal handler for
``SIGTSTP``, along with a poolling routine called ``check_fg_bg()``.
Here are the relevant states, transitions, and POSIX signals::
[Running] -------- Ctrl-Z sends SIGTSTP ------------.
[ in FG ] <------- fg sends SIGCONT --------------. |
^ | |
| fg (no signal) | |
| | v
[Running] <------- bg sends SIGCONT ---------- [Stopped]
[ in BG ] [ in BG ]
We handle all transitions exept for ``SIGTSTP`` generated by Ctrl-Z
by periodically calling ``check_fg_bg()``. This routine notices if
we are in the background with canonical mode or echo disabled, or if
we are in the foreground without canonical disabled and echo enabled,
and it fixes the terminal settings in response.
``check_fg_bg()`` works *except* for when the process is stopped with
``SIGTSTP``. We cannot rely on a periodic timer in this case, as it
may not rrun before the process stops. We therefore restore terminal
settings in the ``SIGTSTP`` handler.
Additional notes:
* We mostly use polling here instead of a SIGARLM timer or a
thread. This is to avoid the complexities of many interrupts, which
seem to make system calls (like I/O) unreliable in older Python
versions (2.6 and 2.7). See these issues for details:
1. https://www.python.org/dev/peps/pep-0475/
2. https://bugs.python.org/issue8354
There are essentially too many ways for asynchronous signals to go
wrong if we also have to support older Python versions, so we opt
not to use them.
* ``SIGSTOP`` can stop a process (in the foreground or background),
but it can't be caught. Because of this, we can't fix any terminal
settings on ``SIGSTOP``, and the terminal will be left with
``ICANON`` and ``ECHO`` disabled until it is resumes execution.
* Technically, a process *could* be sent ``SIGTSTP`` while running in
the foreground, without the shell backgrounding that process. This
doesn't happen in practice, and we assume that ``SIGTSTP`` always
means that defaults should be restored.
* We rely on ``termios`` support. Without it, or if the stream isn't
a TTY, ``keyboard_input`` has no effect.
Note: this depends on termios support. If termios isn't available,
or if the stream isn't a TTY, this context manager has no effect.
"""
def __init__(self, stream):
"""Create a context manager that will enable keyboard input on stream.
@ -89,42 +150,97 @@ def __init__(self, stream):
"""
self.stream = stream
def _is_background(self):
"""True iff calling process is in the background."""
return _is_background_tty(self.stream)
def _get_canon_echo_flags(self):
"""Get current termios canonical and echo settings."""
cfg = termios.tcgetattr(self.stream)
return (
bool(cfg[3] & termios.ICANON),
bool(cfg[3] & termios.ECHO),
)
def _enable_keyboard_input(self):
"""Disable canonical input and echoing on ``self.stream``."""
# "enable" input by disabling canonical mode and echo
new_cfg = termios.tcgetattr(self.stream)
new_cfg[3] &= ~termios.ICANON
new_cfg[3] &= ~termios.ECHO
# Apply new settings for terminal
with ignore_signal(signal.SIGTTOU):
termios.tcsetattr(self.stream, termios.TCSANOW, new_cfg)
def _restore_default_terminal_settings(self):
"""Restore the original input configuration on ``self.stream``."""
# _restore_default_terminal_settings Can be called in foreground
# or background. When called in the background, tcsetattr triggers
# SIGTTOU, which we must ignore, or the process will be stopped.
with ignore_signal(signal.SIGTTOU):
termios.tcsetattr(self.stream, termios.TCSANOW, self.old_cfg)
def _tstp_handler(self, signum, frame):
self._restore_default_terminal_settings()
os.kill(os.getpid(), signal.SIGSTOP)
def check_fg_bg(self):
# old_cfg is set up in __enter__ and indicates that we have
# termios and a valid stream.
if not self.old_cfg:
return
# query terminal flags and fg/bg status
flags = self._get_canon_echo_flags()
bg = self._is_background()
# restore sanity if flags are amiss -- see diagram in class docs
if not bg and any(flags): # fg, but input not enabled
self._enable_keyboard_input()
elif bg and not all(flags): # bg, but input enabled
self._restore_default_terminal_settings()
def __enter__(self):
"""Enable immediate keypress input on stream.
"""Enable immediate keypress input, while this process is foreground.
If the stream is not a TTY or the system doesn't support termios,
do nothing.
"""
self.old_cfg = None
self.old_handlers = {}
# Ignore all this if the input stream is not a tty.
if not self.stream or not self.stream.isatty():
return
return self
# If this fails, self.old_cfg will remain None
if termios and not _is_background_tty():
# save old termios settings
old_cfg = termios.tcgetattr(self.stream)
if termios:
# save old termios settings to restore later
self.old_cfg = termios.tcgetattr(self.stream)
try:
# create new settings with canonical input and echo
# disabled, so keypresses are immediate & don't echo.
self.new_cfg = termios.tcgetattr(self.stream)
self.new_cfg[3] &= ~termios.ICANON
self.new_cfg[3] &= ~termios.ECHO
# Install a signal handler to disable/enable keyboard input
# when the process moves between foreground and background.
self.old_handlers[signal.SIGTSTP] = signal.signal(
signal.SIGTSTP, self._tstp_handler)
# Apply new settings for terminal
termios.tcsetattr(self.stream, termios.TCSADRAIN, self.new_cfg)
self.old_cfg = old_cfg
# add an atexit handler to ensure the terminal is restored
atexit.register(self._restore_default_terminal_settings)
except Exception:
pass # some OS's do not support termios, so ignore
# enable keyboard input initially (if foreground)
if not self._is_background():
self._enable_keyboard_input()
return self
def __exit__(self, exc_type, exception, traceback):
"""If termios was avaialble, restore old settings."""
"""If termios was available, restore old settings."""
if self.old_cfg:
with background_safe(): # change it back even if backgrounded now
termios.tcsetattr(self.stream, termios.TCSADRAIN, self.old_cfg)
self._restore_default_terminal_settings()
# restore SIGSTP and SIGCONT handlers
if self.old_handlers:
for signum, old_handler in self.old_handlers.items():
signal.signal(signum, old_handler)
class Unbuffered(object):
@ -300,11 +416,11 @@ def __enter__(self):
self._saved_debug = tty._debug
# OS-level pipe for redirecting output to logger
self.read_fd, self.write_fd = os.pipe()
read_fd, write_fd = os.pipe()
# Multiprocessing pipe for communication back from the daemon
# Currently only used to save echo value between uses
self.parent, self.child = multiprocessing.Pipe()
self.parent_pipe, child_pipe = multiprocessing.Pipe()
# Sets a daemon that writes to file what it reads from a pipe
try:
@ -315,10 +431,15 @@ def __enter__(self):
input_stream = None # just don't forward input if this fails
self.process = multiprocessing.Process(
target=self._writer_daemon, args=(input_stream,))
target=_writer_daemon,
args=(
input_stream, read_fd, write_fd, self.echo, self.log_file,
child_pipe
)
)
self.process.daemon = True # must set before start()
self.process.start()
os.close(self.read_fd) # close in the parent process
os.close(read_fd) # close in the parent process
finally:
if input_stream:
@ -340,9 +461,9 @@ def __enter__(self):
self._saved_stderr = os.dup(sys.stderr.fileno())
# redirect to the pipe we created above
os.dup2(self.write_fd, sys.stdout.fileno())
os.dup2(self.write_fd, sys.stderr.fileno())
os.close(self.write_fd)
os.dup2(write_fd, sys.stdout.fileno())
os.dup2(write_fd, sys.stderr.fileno())
os.close(write_fd)
else:
# Handle I/O the Python way. This won't redirect lower-level
@ -355,7 +476,7 @@ def __enter__(self):
self._saved_stderr = sys.stderr
# create a file object for the pipe; redirect to it.
pipe_fd_out = os.fdopen(self.write_fd, 'w')
pipe_fd_out = os.fdopen(write_fd, 'w')
sys.stdout = pipe_fd_out
sys.stderr = pipe_fd_out
@ -394,14 +515,14 @@ def __exit__(self, exc_type, exc_val, exc_tb):
# print log contents in parent if needed.
if self.write_log_in_parent:
string = self.parent.recv()
string = self.parent_pipe.recv()
self.file_like.write(string)
if self.close_log_in_parent:
self.log_file.close()
# recover and store echo settings from the child before it dies
self.echo = self.parent.recv()
self.echo = self.parent_pipe.recv()
# join the daemon process. The daemon will quit automatically
# when the write pipe is closed; we just wait for it here.
@ -426,90 +547,166 @@ def force_echo(self):
# exactly before and after the text we want to echo.
sys.stdout.write(xon)
sys.stdout.flush()
yield
sys.stdout.write(xoff)
sys.stdout.flush()
def _writer_daemon(self, stdin):
"""Daemon that writes output to the log file and stdout."""
# Use line buffering (3rd param = 1) since Python 3 has a bug
# that prevents unbuffered text I/O.
in_pipe = os.fdopen(self.read_fd, 'r', 1)
os.close(self.write_fd)
echo = self.echo # initial echo setting, user-controllable
force_echo = False # parent can force echo for certain output
# list of streams to select from
istreams = [in_pipe, stdin] if stdin else [in_pipe]
log_file = self.log_file
def handle_write(force_echo):
# Handle output from the with block process.
# If we arrive here it means that in_pipe was
# ready for reading : it should never happen that
# line is false-ish
line = in_pipe.readline()
if not line:
return (True, force_echo) # break while loop
# find control characters and strip them.
controls = control.findall(line)
line = re.sub(control, '', line)
# Echo to stdout if requested or forced
if echo or force_echo:
try:
if termios:
conf = termios.tcgetattr(sys.stdout)
tostop = conf[3] & termios.TOSTOP
else:
tostop = True
except Exception:
tostop = True
if not (tostop and _is_background_tty()):
sys.stdout.write(line)
sys.stdout.flush()
# Stripped output to log file.
log_file.write(_strip(line))
log_file.flush()
if xon in controls:
force_echo = True
if xoff in controls:
force_echo = False
return (False, force_echo)
try:
with _keyboard_input(stdin):
while True:
# No need to set any timeout for select.select
# Wait until a key press or an event on in_pipe.
rlist, _, _ = select.select(istreams, [], [])
# Allow user to toggle echo with 'v' key.
# Currently ignores other chars.
# only read stdin if we're in the foreground
if stdin in rlist and not _is_background_tty():
if stdin.read(1) == 'v':
echo = not echo
if in_pipe in rlist:
br, fe = handle_write(force_echo)
force_echo = fe
if br:
break
except BaseException:
tty.error("Exception occurred in writer daemon!")
traceback.print_exc()
yield
finally:
# send written data back to parent if we used a StringIO
if self.write_log_in_parent:
self.child.send(log_file.getvalue())
log_file.close()
sys.stdout.write(xoff)
sys.stdout.flush()
# send echo value back to the parent so it can be preserved.
self.child.send(echo)
def _writer_daemon(stdin, read_fd, write_fd, echo, log_file, control_pipe):
"""Daemon used by ``log_output`` to write to a log file and to ``stdout``.
The daemon receives output from the parent process and writes it both
to a log and, optionally, to ``stdout``. The relationship looks like
this::
Terminal
|
| +-------------------------+
| | Parent Process |
+--------> | with log_output(): |
| stdin | ... |
| +-------------------------+
| ^ | write_fd (parent's redirected stdout)
| | control |
| | pipe |
| | v read_fd
| +-------------------------+ stdout
| | Writer daemon |------------>
+--------> | read from read_fd | log_file
stdin | write to out and log |------------>
+-------------------------+
Within the ``log_output`` handler, the parent's output is redirected
to a pipe from which the daemon reads. The daemon writes each line
from the pipe to a log file and (optionally) to ``stdout``. The user
can hit ``v`` to toggle output on ``stdout``.
In addition to the input and output file descriptors, the daemon
interacts with the parent via ``control_pipe``. It reports whether
``stdout`` was enabled or disabled when it finished and, if the
``log_file`` is a ``StringIO`` object, then the daemon also sends the
logged output back to the parent as a string, to be written to the
``StringIO`` in the parent. This is mainly for testing.
Arguments:
stdin (stream): input from the terminal
read_fd (int): pipe for reading from parent's redirected stdout
write_fd (int): parent's end of the pipe will write to (will be
immediately closed by the writer daemon)
echo (bool): initial echo setting -- controlled by user and
preserved across multiple writer daemons
log_file (file-like): file to log all output
control_pipe (Pipe): multiprocessing pipe on which to send control
information to the parent
"""
# Use line buffering (3rd param = 1) since Python 3 has a bug
# that prevents unbuffered text I/O.
in_pipe = os.fdopen(read_fd, 'r', 1)
os.close(write_fd)
# list of streams to select from
istreams = [in_pipe, stdin] if stdin else [in_pipe]
force_echo = False # parent can force echo for certain output
try:
with keyboard_input(stdin) as kb:
while True:
# fix the terminal settings if we recently came to
# the foreground
kb.check_fg_bg()
# wait for input from any stream. use a coarse timeout to
# allow other checks while we wait for input
rlist, _, _ = _retry(select.select)(istreams, [], [], 1e-1)
# Allow user to toggle echo with 'v' key.
# Currently ignores other chars.
# only read stdin if we're in the foreground
if stdin in rlist and not _is_background_tty(stdin):
# it's possible to be backgrounded between the above
# check and the read, so we ignore SIGTTIN here.
with ignore_signal(signal.SIGTTIN):
try:
if stdin.read(1) == 'v':
echo = not echo
except IOError as e:
# If SIGTTIN is ignored, the system gives EIO
# to let the caller know the read failed b/c it
# was in the bg. Ignore that too.
if e.errno != errno.EIO:
raise
if in_pipe in rlist:
# Handle output from the calling process.
line = _retry(in_pipe.readline)()
if not line:
break
# find control characters and strip them.
controls = control.findall(line)
line = control.sub('', line)
# Echo to stdout if requested or forced.
if echo or force_echo:
sys.stdout.write(line)
sys.stdout.flush()
# Stripped output to log file.
log_file.write(_strip(line))
log_file.flush()
if xon in controls:
force_echo = True
if xoff in controls:
force_echo = False
except BaseException:
tty.error("Exception occurred in writer daemon!")
traceback.print_exc()
finally:
# send written data back to parent if we used a StringIO
if isinstance(log_file, StringIO):
control_pipe.send(log_file.getvalue())
log_file.close()
# send echo value back to the parent so it can be preserved.
control_pipe.send(echo)
def _retry(function):
"""Retry a call if errors indicating an interrupted system call occur.
Interrupted system calls return -1 and set ``errno`` to ``EINTR`` if
certain flags are not set. Newer Pythons automatically retry them,
but older Pythons do not, so we need to retry the calls.
This function converts a call like this:
syscall(args)
and makes it retry by wrapping the function like this:
_retry(syscall)(args)
This is a private function because EINTR is unfortunately raised in
different ways from different functions, and we only handle the ones
relevant for this file.
"""
def wrapped(*args, **kwargs):
while True:
try:
return function(*args, **kwargs)
except IOError as e:
if e.errno == errno.EINTR:
continue
raise
except select.error as e:
if e.args[0] == errno.EINTR:
continue
raise
return wrapped

View File

@ -0,0 +1,344 @@
# Copyright 2013-2020 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
"""The pty module handles pseudo-terminals.
Currently, the infrastructure here is only used to test llnl.util.tty.log.
If this is used outside a testing environment, we will want to reconsider
things like timeouts in ``ProcessController.wait()``, which are set to
get tests done quickly, not to avoid high CPU usage.
"""
from __future__ import print_function
import os
import signal
import multiprocessing
import re
import sys
import termios
import time
import traceback
import llnl.util.tty.log as log
from spack.util.executable import which
class ProcessController(object):
"""Wrapper around some fundamental process control operations.
This allows one process to drive another similar to the way a shell
would, by sending signals and I/O.
"""
def __init__(self, pid, master_fd,
timeout=1, sleep_time=1e-1, debug=False):
"""Create a controller to manipulate the process with id ``pid``
Args:
pid (int): id of process to control
master_fd (int): master file descriptor attached to pid's stdin
timeout (int): time in seconds for wait operations to time out
(default 1 second)
sleep_time (int): time to sleep after signals, to control the
signal rate of the controller (default 1e-1)
debug (bool): whether ``horizontal_line()`` and ``status()`` should
produce output when called (default False)
``sleep_time`` allows the caller to insert delays after calls
that signal or modify the controlled process. Python behaves very
poorly if signals arrive too fast, and drowning a Python process
with a Python handler with signals can kill the process and hang
our tests, so we throttle this a closer-to-interactive rate.
"""
self.pid = pid
self.pgid = os.getpgid(pid)
self.master_fd = master_fd
self.timeout = timeout
self.sleep_time = sleep_time
self.debug = debug
# we need the ps command to wait for process statuses
self.ps = which("ps", required=True)
def get_canon_echo_attrs(self):
"""Get echo and canon attributes of the terminal of master_fd."""
cfg = termios.tcgetattr(self.master_fd)
return (
bool(cfg[3] & termios.ICANON),
bool(cfg[3] & termios.ECHO),
)
def horizontal_line(self, name):
"""Labled horizontal line for debugging."""
if self.debug:
sys.stderr.write(
"------------------------------------------- %s\n" % name
)
def status(self):
"""Print debug message with status info for the child."""
if self.debug:
canon, echo = self.get_canon_echo_attrs()
sys.stderr.write("canon: %s, echo: %s\n" % (
"on" if canon else "off",
"on" if echo else "off",
))
sys.stderr.write("input: %s\n" % self.input_on())
sys.stderr.write("bg: %s\n" % self.background())
sys.stderr.write("\n")
def input_on(self):
"""True if keyboard input is enabled on the master_fd pty."""
return self.get_canon_echo_attrs() == (False, False)
def background(self):
"""True if pgid is in a background pgroup of master_fd's terminal."""
return self.pgid != os.tcgetpgrp(self.master_fd)
def tstp(self):
"""Send SIGTSTP to the controlled process."""
self.horizontal_line("tstp")
os.killpg(self.pgid, signal.SIGTSTP)
time.sleep(self.sleep_time)
def cont(self):
self.horizontal_line("cont")
os.killpg(self.pgid, signal.SIGCONT)
time.sleep(self.sleep_time)
def fg(self):
self.horizontal_line("fg")
with log.ignore_signal(signal.SIGTTOU):
os.tcsetpgrp(self.master_fd, os.getpgid(self.pid))
time.sleep(self.sleep_time)
def bg(self):
self.horizontal_line("bg")
with log.ignore_signal(signal.SIGTTOU):
os.tcsetpgrp(self.master_fd, os.getpgrp())
time.sleep(self.sleep_time)
def write(self, byte_string):
self.horizontal_line("write '%s'" % byte_string.decode("utf-8"))
os.write(self.master_fd, byte_string)
def wait(self, condition):
start = time.time()
while (((time.time() - start) < self.timeout) and not condition()):
time.sleep(1e-2)
assert condition()
def wait_enabled(self):
self.wait(lambda: self.input_on() and not self.background())
def wait_disabled(self):
self.wait(lambda: not self.input_on() and self.background())
def wait_disabled_fg(self):
self.wait(lambda: not self.input_on() and not self.background())
def proc_status(self):
status = self.ps("-p", str(self.pid), "-o", "stat", output=str)
status = re.split(r"\s+", status.strip(), re.M)
return status[1]
def wait_stopped(self):
self.wait(lambda: "T" in self.proc_status())
def wait_running(self):
self.wait(lambda: "T" not in self.proc_status())
class PseudoShell(object):
"""Sets up master and child processes with a PTY.
You can create a ``PseudoShell`` if you want to test how some
function responds to terminal input. This is a pseudo-shell from a
job control perspective; ``master_function`` and ``child_function``
are set up with a pseudoterminal (pty) so that the master can drive
the child through process control signals and I/O.
The two functions should have signatures like this::
def master_function(proc, ctl, **kwargs)
def child_function(**kwargs)
``master_function`` is spawned in its own process and passed three
arguments:
proc
the ``multiprocessing.Process`` object representing the child
ctl
a ``ProcessController`` object tied to the child
kwargs
keyword arguments passed from ``PseudoShell.start()``.
``child_function`` is only passed ``kwargs`` delegated from
``PseudoShell.start()``.
The ``ctl.master_fd`` will have its ``master_fd`` connected to
``sys.stdin`` in the child process. Both processes will share the
same ``sys.stdout`` and ``sys.stderr`` as the process instantiating
``PseudoShell``.
Here are the relationships between processes created::
._________________________________________________________.
| Child Process | pid 2
| - runs child_function | pgroup 2
|_________________________________________________________| session 1
^
| create process with master_fd connected to stdin
| stdout, stderr are the same as caller
._________________________________________________________.
| Master Process | pid 1
| - runs master_function | pgroup 1
| - uses ProcessController and master_fd to control child | session 1
|_________________________________________________________|
^
| create process
| stdin, stdout, stderr are the same as caller
._________________________________________________________.
| Caller | pid 0
| - Constructs, starts, joins PseudoShell | pgroup 0
| - provides master_function, child_function | session 0
|_________________________________________________________|
"""
def __init__(self, master_function, child_function):
self.proc = None
self.master_function = master_function
self.child_function = child_function
# these can be optionally set to change defaults
self.controller_timeout = 1
self.sleep_time = 0
def start(self, **kwargs):
"""Start the master and child processes.
Arguments:
kwargs (dict): arbitrary keyword arguments that will be
passed to master and child functions
The master process will create the child, then call
``master_function``. The child process will call
``child_function``.
"""
self.proc = multiprocessing.Process(
target=PseudoShell._set_up_and_run_master_function,
args=(self.master_function, self.child_function,
self.controller_timeout, self.sleep_time),
kwargs=kwargs,
)
self.proc.start()
def join(self):
"""Wait for the child process to finish, and return its exit code."""
self.proc.join()
return self.proc.exitcode
@staticmethod
def _set_up_and_run_child_function(
tty_name, stdout_fd, stderr_fd, ready, child_function, **kwargs):
"""Child process wrapper for PseudoShell.
Handles the mechanics of setting up a PTY, then calls
``child_function``.
"""
# new process group, like a command or pipeline launched by a shell
os.setpgrp()
# take controlling terminal and set up pty IO
stdin_fd = os.open(tty_name, os.O_RDWR)
os.dup2(stdin_fd, sys.stdin.fileno())
os.dup2(stdout_fd, sys.stdout.fileno())
os.dup2(stderr_fd, sys.stderr.fileno())
os.close(stdin_fd)
if kwargs.get("debug"):
sys.stderr.write(
"child: stdin.isatty(): %s\n" % sys.stdin.isatty())
# tell the parent that we're really running
if kwargs.get("debug"):
sys.stderr.write("child: ready!\n")
ready.value = True
try:
child_function(**kwargs)
except BaseException:
traceback.print_exc()
@staticmethod
def _set_up_and_run_master_function(
master_function, child_function, controller_timeout, sleep_time,
**kwargs):
"""Set up a pty, spawn a child process, and execute master_function.
Handles the mechanics of setting up a PTY, then calls
``master_function``.
"""
os.setsid() # new session; this process is the controller
master_fd, child_fd = os.openpty()
pty_name = os.ttyname(child_fd)
# take controlling terminal
pty_fd = os.open(pty_name, os.O_RDWR)
os.close(pty_fd)
ready = multiprocessing.Value('i', False)
child_process = multiprocessing.Process(
target=PseudoShell._set_up_and_run_child_function,
args=(pty_name, sys.stdout.fileno(), sys.stderr.fileno(),
ready, child_function),
kwargs=kwargs,
)
child_process.start()
# wait for subprocess to be running and connected.
while not ready.value:
time.sleep(1e-5)
pass
if kwargs.get("debug"):
sys.stderr.write("pid: %d\n" % os.getpid())
sys.stderr.write("pgid: %d\n" % os.getpgrp())
sys.stderr.write("sid: %d\n" % os.getsid(0))
sys.stderr.write("tcgetpgrp: %d\n" % os.tcgetpgrp(master_fd))
sys.stderr.write("\n")
child_pgid = os.getpgid(child_process.pid)
sys.stderr.write("child pid: %d\n" % child_process.pid)
sys.stderr.write("child pgid: %d\n" % child_pgid)
sys.stderr.write("child sid: %d\n" % os.getsid(child_process.pid))
sys.stderr.write("\n")
sys.stderr.flush()
# set up master to ignore SIGTSTP, like a shell
signal.signal(signal.SIGTSTP, signal.SIG_IGN)
# call the master function once the child is ready
try:
controller = ProcessController(
child_process.pid, master_fd, debug=kwargs.get("debug"))
controller.timeout = controller_timeout
controller.sleep_time = sleep_time
error = master_function(child_process, controller, **kwargs)
except BaseException:
error = 1
traceback.print_exc()
child_process.join()
# return whether either the parent or child failed
return error or child_process.exitcode

View File

@ -130,3 +130,10 @@ def test_load_modules_from_file(module_path):
foo = llnl.util.lang.load_module_from_file('foo', module_path)
assert foo.value == 1
assert foo.path == os.path.join('/usr', 'bin')
def test_uniq():
assert [1, 2, 3] == llnl.util.lang.uniq([1, 2, 3])
assert [1, 2, 3] == llnl.util.lang.uniq([1, 1, 1, 1, 2, 2, 2, 3, 3])
assert [1, 2, 1] == llnl.util.lang.uniq([1, 1, 1, 1, 2, 2, 2, 1, 1])
assert [] == llnl.util.lang.uniq([])

View File

@ -1,84 +0,0 @@
# Copyright 2013-2020 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
from __future__ import print_function
import pytest
from llnl.util.tty.log import log_output
from spack.util.executable import which
def test_log_python_output_with_python_stream(capsys, tmpdir):
# pytest's DontReadFromInput object does not like what we do here, so
# disable capsys or things hang.
with tmpdir.as_cwd():
with capsys.disabled():
with log_output('foo.txt'):
print('logged')
with open('foo.txt') as f:
assert f.read() == 'logged\n'
assert capsys.readouterr() == ('', '')
def test_log_python_output_with_fd_stream(capfd, tmpdir):
with tmpdir.as_cwd():
with log_output('foo.txt'):
print('logged')
with open('foo.txt') as f:
assert f.read() == 'logged\n'
# Coverage is cluttering stderr during tests
assert capfd.readouterr()[0] == ''
def test_log_python_output_and_echo_output(capfd, tmpdir):
with tmpdir.as_cwd():
with log_output('foo.txt') as logger:
with logger.force_echo():
print('echo')
print('logged')
# Coverage is cluttering stderr during tests
assert capfd.readouterr()[0] == 'echo\n'
with open('foo.txt') as f:
assert f.read() == 'echo\nlogged\n'
@pytest.mark.skipif(not which('echo'), reason="needs echo command")
def test_log_subproc_output(capsys, tmpdir):
echo = which('echo')
# pytest seems to interfere here, so we need to use capsys.disabled()
# TODO: figure out why this is and whether it means we're doing
# sometihng wrong with OUR redirects. Seems like it should work even
# with capsys enabled.
with tmpdir.as_cwd():
with capsys.disabled():
with log_output('foo.txt'):
echo('logged')
with open('foo.txt') as f:
assert f.read() == 'logged\n'
@pytest.mark.skipif(not which('echo'), reason="needs echo command")
def test_log_subproc_and_echo_output(capfd, tmpdir):
echo = which('echo')
with tmpdir.as_cwd():
with log_output('foo.txt') as logger:
with logger.force_echo():
echo('echo')
print('logged')
# Coverage is cluttering stderr during tests
assert capfd.readouterr()[0] == 'echo\n'
with open('foo.txt') as f:
assert f.read() == 'logged\n'

View File

@ -0,0 +1,442 @@
# Copyright 2013-2020 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
from __future__ import print_function
import contextlib
import multiprocessing
import os
import signal
import sys
import time
try:
import termios
except ImportError:
termios = None
import pytest
import llnl.util.tty.log
from llnl.util.lang import uniq
from llnl.util.tty.log import log_output
from llnl.util.tty.pty import PseudoShell
from spack.util.executable import which
@contextlib.contextmanager
def nullcontext():
yield
def test_log_python_output_with_echo(capfd, tmpdir):
with tmpdir.as_cwd():
with log_output('foo.txt', echo=True):
print('logged')
# foo.txt has output
with open('foo.txt') as f:
assert f.read() == 'logged\n'
# output is also echoed.
assert capfd.readouterr()[0] == 'logged\n'
def test_log_python_output_without_echo(capfd, tmpdir):
with tmpdir.as_cwd():
with log_output('foo.txt'):
print('logged')
# foo.txt has output
with open('foo.txt') as f:
assert f.read() == 'logged\n'
# nothing on stdout or stderr
assert capfd.readouterr()[0] == ''
def test_log_python_output_and_echo_output(capfd, tmpdir):
with tmpdir.as_cwd():
# echo two lines
with log_output('foo.txt') as logger:
with logger.force_echo():
print('force echo')
print('logged')
# log file contains everything
with open('foo.txt') as f:
assert f.read() == 'force echo\nlogged\n'
# only force-echo'd stuff is in output
assert capfd.readouterr()[0] == 'force echo\n'
@pytest.mark.skipif(not which('echo'), reason="needs echo command")
def test_log_subproc_and_echo_output_no_capfd(capfd, tmpdir):
echo = which('echo')
# this is split into two tests because capfd interferes with the
# output logged to file when using a subprocess. We test the file
# here, and echoing in test_log_subproc_and_echo_output_capfd below.
with capfd.disabled():
with tmpdir.as_cwd():
with log_output('foo.txt') as logger:
with logger.force_echo():
echo('echo')
print('logged')
with open('foo.txt') as f:
assert f.read() == 'echo\nlogged\n'
@pytest.mark.skipif(not which('echo'), reason="needs echo command")
def test_log_subproc_and_echo_output_capfd(capfd, tmpdir):
echo = which('echo')
# This tests *only* what is echoed when using a subprocess, as capfd
# interferes with the logged data. See
# test_log_subproc_and_echo_output_no_capfd for tests on the logfile.
with tmpdir.as_cwd():
with log_output('foo.txt') as logger:
with logger.force_echo():
echo('echo')
print('logged')
assert capfd.readouterr()[0] == "echo\n"
#
# Tests below use a pseudoterminal to test llnl.util.tty.log
#
def simple_logger(**kwargs):
"""Mock logger (child) process for testing log.keyboard_input."""
def handler(signum, frame):
running[0] = False
signal.signal(signal.SIGUSR1, handler)
log_path = kwargs["log_path"]
running = [True]
with log_output(log_path):
while running[0]:
print("line")
time.sleep(1e-3)
def mock_shell_fg(proc, ctl, **kwargs):
"""PseudoShell master function for test_foreground_background."""
ctl.fg()
ctl.status()
ctl.wait_enabled()
os.kill(proc.pid, signal.SIGUSR1)
def mock_shell_fg_no_termios(proc, ctl, **kwargs):
"""PseudoShell master function for test_foreground_background."""
ctl.fg()
ctl.status()
ctl.wait_disabled_fg()
os.kill(proc.pid, signal.SIGUSR1)
def mock_shell_bg(proc, ctl, **kwargs):
"""PseudoShell master function for test_foreground_background."""
ctl.bg()
ctl.status()
ctl.wait_disabled()
os.kill(proc.pid, signal.SIGUSR1)
def mock_shell_tstp_cont(proc, ctl, **kwargs):
"""PseudoShell master function for test_foreground_background."""
ctl.tstp()
ctl.wait_stopped()
ctl.cont()
ctl.wait_running()
os.kill(proc.pid, signal.SIGUSR1)
def mock_shell_tstp_tstp_cont(proc, ctl, **kwargs):
"""PseudoShell master function for test_foreground_background."""
ctl.tstp()
ctl.wait_stopped()
ctl.tstp()
ctl.wait_stopped()
ctl.cont()
ctl.wait_running()
os.kill(proc.pid, signal.SIGUSR1)
def mock_shell_tstp_tstp_cont_cont(proc, ctl, **kwargs):
"""PseudoShell master function for test_foreground_background."""
ctl.tstp()
ctl.wait_stopped()
ctl.tstp()
ctl.wait_stopped()
ctl.cont()
ctl.wait_running()
ctl.cont()
ctl.wait_running()
os.kill(proc.pid, signal.SIGUSR1)
def mock_shell_bg_fg(proc, ctl, **kwargs):
"""PseudoShell master function for test_foreground_background."""
ctl.bg()
ctl.status()
ctl.wait_disabled()
ctl.fg()
ctl.status()
ctl.wait_enabled()
os.kill(proc.pid, signal.SIGUSR1)
def mock_shell_bg_fg_no_termios(proc, ctl, **kwargs):
"""PseudoShell master function for test_foreground_background."""
ctl.bg()
ctl.status()
ctl.wait_disabled()
ctl.fg()
ctl.status()
ctl.wait_disabled_fg()
os.kill(proc.pid, signal.SIGUSR1)
def mock_shell_fg_bg(proc, ctl, **kwargs):
"""PseudoShell master function for test_foreground_background."""
ctl.fg()
ctl.status()
ctl.wait_enabled()
ctl.bg()
ctl.status()
ctl.wait_disabled()
os.kill(proc.pid, signal.SIGUSR1)
def mock_shell_fg_bg_no_termios(proc, ctl, **kwargs):
"""PseudoShell master function for test_foreground_background."""
ctl.fg()
ctl.status()
ctl.wait_disabled_fg()
ctl.bg()
ctl.status()
ctl.wait_disabled()
os.kill(proc.pid, signal.SIGUSR1)
@contextlib.contextmanager
def no_termios():
saved = llnl.util.tty.log.termios
llnl.util.tty.log.termios = None
try:
yield
finally:
llnl.util.tty.log.termios = saved
@pytest.mark.skipif(not which("ps"), reason="requires ps utility")
@pytest.mark.skipif(not termios, reason="requires termios support")
@pytest.mark.parametrize('test_fn,termios_on_or_off', [
# tests with termios
(mock_shell_fg, nullcontext),
(mock_shell_bg, nullcontext),
(mock_shell_bg_fg, nullcontext),
(mock_shell_fg_bg, nullcontext),
(mock_shell_tstp_cont, nullcontext),
(mock_shell_tstp_tstp_cont, nullcontext),
(mock_shell_tstp_tstp_cont_cont, nullcontext),
# tests without termios
(mock_shell_fg_no_termios, no_termios),
(mock_shell_bg, no_termios),
(mock_shell_bg_fg_no_termios, no_termios),
(mock_shell_fg_bg_no_termios, no_termios),
(mock_shell_tstp_cont, no_termios),
(mock_shell_tstp_tstp_cont, no_termios),
(mock_shell_tstp_tstp_cont_cont, no_termios),
])
def test_foreground_background(test_fn, termios_on_or_off, tmpdir):
"""Functional tests for foregrounding and backgrounding a logged process.
This ensures that things like SIGTTOU are not raised and that
terminal settings are corrected on foreground/background and on
process stop and start.
"""
shell = PseudoShell(test_fn, simple_logger)
log_path = str(tmpdir.join("log.txt"))
# run the shell test
with termios_on_or_off():
shell.start(log_path=log_path, debug=True)
exitcode = shell.join()
# processes completed successfully
assert exitcode == 0
# assert log was created
assert os.path.exists(log_path)
def synchronized_logger(**kwargs):
"""Mock logger (child) process for testing log.keyboard_input.
This logger synchronizes with the parent process to test that 'v' can
toggle output. It is used in ``test_foreground_background_output`` below.
"""
def handler(signum, frame):
running[0] = False
signal.signal(signal.SIGUSR1, handler)
log_path = kwargs["log_path"]
write_lock = kwargs["write_lock"]
v_lock = kwargs["v_lock"]
running = [True]
sys.stderr.write(os.getcwd() + "\n")
with log_output(log_path) as logger:
with logger.force_echo():
print("forced output")
while running[0]:
with write_lock:
if v_lock.acquire(False): # non-blocking acquire
print("off")
v_lock.release()
else:
print("on") # lock held; v is toggled on
time.sleep(1e-2)
def mock_shell_v_v(proc, ctl, **kwargs):
"""PseudoShell master function for test_foreground_background_output."""
write_lock = kwargs["write_lock"]
v_lock = kwargs["v_lock"]
ctl.fg()
ctl.wait_enabled()
time.sleep(.1)
write_lock.acquire() # suspend writing
v_lock.acquire() # enable v lock
ctl.write(b'v') # toggle v on stdin
time.sleep(.1)
write_lock.release() # resume writing
time.sleep(.1)
write_lock.acquire() # suspend writing
ctl.write(b'v') # toggle v on stdin
time.sleep(.1)
v_lock.release() # disable v lock
write_lock.release() # resume writing
time.sleep(.1)
os.kill(proc.pid, signal.SIGUSR1)
def mock_shell_v_v_no_termios(proc, ctl, **kwargs):
"""PseudoShell master function for test_foreground_background_output."""
write_lock = kwargs["write_lock"]
v_lock = kwargs["v_lock"]
ctl.fg()
ctl.wait_disabled_fg()
time.sleep(.1)
write_lock.acquire() # suspend writing
v_lock.acquire() # enable v lock
ctl.write(b'v\n') # toggle v on stdin
time.sleep(.1)
write_lock.release() # resume writing
time.sleep(.1)
write_lock.acquire() # suspend writing
ctl.write(b'v\n') # toggle v on stdin
time.sleep(.1)
v_lock.release() # disable v lock
write_lock.release() # resume writing
time.sleep(.1)
os.kill(proc.pid, signal.SIGUSR1)
@pytest.mark.skipif(not which("ps"), reason="requires ps utility")
@pytest.mark.skipif(not termios, reason="requires termios support")
@pytest.mark.parametrize('test_fn,termios_on_or_off', [
(mock_shell_v_v, nullcontext),
(mock_shell_v_v_no_termios, no_termios),
])
def test_foreground_background_output(
test_fn, capfd, termios_on_or_off, tmpdir):
"""Tests hitting 'v' toggles output, and that force_echo works."""
shell = PseudoShell(test_fn, synchronized_logger)
log_path = str(tmpdir.join("log.txt"))
# Locks for synchronizing with child
write_lock = multiprocessing.Lock() # must be held by child to write
v_lock = multiprocessing.Lock() # held while master is in v mode
with termios_on_or_off():
shell.start(
write_lock=write_lock,
v_lock=v_lock,
debug=True,
log_path=log_path
)
exitcode = shell.join()
out, err = capfd.readouterr()
print(err) # will be shown if something goes wrong
print(out)
# processes completed successfully
assert exitcode == 0
# split output into lines
output = out.strip().split("\n")
# also get lines of log file
assert os.path.exists(log_path)
with open(log_path) as log:
log = log.read().strip().split("\n")
# Master and child process coordinate with locks such that the child
# writes "off" when echo is off, and "on" when echo is on. The
# output should contain mostly "on" lines, but may contain an "off"
# or two. This is because the master toggles echo by sending "v" on
# stdin to the child, but this is not synchronized with our locks.
# It's good enough for a test, though. We allow at most 2 "off"'s in
# the output to account for the race.
assert (
['forced output', 'on'] == uniq(output) or
output.count("off") <= 2 # if master_fd is a bit slow
)
# log should be off for a while, then on, then off
assert (
['forced output', 'off', 'on', 'off'] == uniq(log) and
log.count("off") > 2 # ensure some "off" lines were omitted
)