Rework output redirection in Spack.

- Simplify interface to log_output. New interface requires only one
  context handler instead of two.  Before:

      with log_output('logfile.txt') as log_redirection:
           with log_redirection:
               # do things ... output will be logged

  After:

      with log_output('logfile.txt'):
          # do things ... output will be logged

  If you also want the output to be echoed to ``stdout``, use the
  `echo` parameter::

      with log_output('logfile.txt', echo=True):
          # do things ... output will be logged and printed out

  And, if you just want to echo *some* stuff from the parent, use
  ``force_echo``:

      with log_output('logfile.txt', echo=False) as logger:
          # do things ... output will be logged

          with logger.force_echo():
              # things here will be echoed *and* logged

  A key difference between this and the previous implementation is that
  *everything* in the context handler is logged.  Previously, things like
  `Executing phase 'configure'` would not be logged, only output to the
  screen, so understanding phases in the build log was difficult.

- The implementation of `log_output()` is different in two major ways:

  1. This implementation avoids race cases by using only one pipe (before
     we had a multiprocessing pipe and a unix pipe).  The logger daemon
     stops naturally when the input stream is closed, which avoids a race
     in the previous implementation where we'd miss some lines of output
     because the parent would shut the daemon down before it was done
     with all output.

  2. Instead of turning output redirection on and off, which prevented
     some things from being logged, this version uses control characters
     in the output stream to enable/disable forced echoing.  We're using
     the time-honored xon and xoff codes, which tell the daemon to echo
     anything between them AND write it to the log.  This is how
     `logger.force_echo()` works.

- Fix places where output could get stuck in buffers by flushing more
  aggressively.  This makes the output printed to the terminal the same
  as that which would be printed through a pipe to `cat` or to a file.
  Previously these could be weirdly different, and some output would be
  missing when redirecting Spack to a file or pipe.

- Simplify input and color handling in both `build_environment.fork()`
  and `llnl.util.tty.log.log_output()`.  Neither requires an input_stream
  parameter anymore; we assume stdin will be forwarded if possible.

- remove `llnl.util.lang.duplicate_stream()` and remove associated
  monkey-patching in tests, as these aren't needed if you just check
  whether stdin is a tty and has a fileno attribute.
This commit is contained in:
Todd Gamblin 2017-08-14 04:33:01 -07:00
parent e0dd55e090
commit 05cc6c966f
5 changed files with 265 additions and 235 deletions

View File

@ -391,19 +391,6 @@ def __init__(self, message):
super(RequiredAttributeError, self).__init__(message)
def duplicate_stream(original):
"""Duplicates a stream at the os level.
Args:
original (stream): original stream to be duplicated. Must have a
``fileno`` callable attribute.
Returns:
file like object: duplicate of the original stream
"""
return os.fdopen(os.dup(original.fileno()))
class ObjectWrapper(object):
"""Base class that wraps an object. Derived classes can add new behavior
while staying undercover.

View File

@ -29,27 +29,28 @@
import re
import select
import sys
from contextlib import contextmanager
import llnl.util.lang as lang
import llnl.util.tty as tty
import llnl.util.tty.color as color
# Use this to strip escape sequences
_escape = re.compile(r'\x1b[^m]*m|\x1b\[?1034h')
# control characters for enabling/disabling echo
#
# We use control characters to ensure that echo enable/disable are inline
# with the other output. We always follow these with a newline to ensure
# one per line the following newline is ignored in output.
xon, xoff = '\x11\n', '\x13\n'
control = re.compile('(\x11\n|\x13\n)')
def _strip(line):
"""Strip color and control characters from a line."""
return _escape.sub('', line)
class _SkipWithBlock():
"""Special exception class used to skip a with block."""
pass
class keyboard_input(object):
"""Disable canonical input and echo on a stream within a with block.
"""Context manager to disable line editing and echoing.
Use this with ``sys.stdin`` for keyboard input, e.g.::
@ -57,14 +58,33 @@ class keyboard_input(object):
r, w, x = select.select([sys.stdin], [], [])
# ... do something with keypresses ...
When the with block completes, this will restore settings before
canonical and echo were disabled.
"""
This disables canonical input so that keypresses are available on the
stream immediately. Typically standard input allows line editing,
which means keypresses won't be sent until the user hits return.
It also disables echoing, so that keys pressed aren't printed to the
terminal. So, the user can hit, e.g., 'v', and it's read on the
other end of the pipe immediately but not printed.
When the with block completes, prior TTY settings are restored.
Note: this depends on termios support. If termios isn't available,
or if the stream isn't a TTY, this context manager has no effect.
"""
def __init__(self, stream):
"""Create a context manager that will enable keyboard input on stream.
Args:
stream (file-like): stream on which to accept keyboard input
"""
self.stream = stream
def __enter__(self):
"""Enable immediate keypress input on stream.
If the stream is not a TTY or the system doesn't support termios,
do nothing.
"""
self.old_cfg = None
# Ignore all this if the input stream is not a tty.
@ -72,7 +92,7 @@ def __enter__(self):
return
try:
# import and mark whether it worked.
# If this fails, self.old_cfg will remain None
import termios
# save old termios settings
@ -89,11 +109,10 @@ def __enter__(self):
termios.tcsetattr(fd, termios.TCSADRAIN, self.new_cfg)
except Exception:
pass # Some OS's do not support termios, so ignore.
pass # some OS's do not support termios, so ignore
def __exit__(self, exc_type, exception, traceback):
# If termios was avaialble, restore old settings after the
# with block
"""If termios was avaialble, restore old settings."""
if self.old_cfg:
import termios
termios.tcsetattr(
@ -101,168 +120,222 @@ def __exit__(self, exc_type, exception, traceback):
class log_output(object):
"""Spawns a daemon that reads from a pipe and writes to a file
"""Context manager that logs its output to a file.
Usage::
In the simplest case, the usage looks like this::
# Spawns the daemon
with log_output('logfile.txt', 'w') as log_redirection:
# do things ... output is not redirected
with log_redirection:
with log_output('logfile.txt'):
# do things ... output will be logged
or::
Any output from the with block will be redirected to ``logfile.txt``.
If you also want the output to be echoed to ``stdout``, use the
``echo`` parameter::
with log_output('logfile.txt', echo=True) as log_redirection:
# do things ... output is not redirected
with log_redirection:
with log_output('logfile.txt', echo=True):
# do things ... output will be logged and printed out
And, if you just want to echo *some* stuff from the parent, use
``force_echo``:
with log_output('logfile.txt', echo=False) as logger:
# do things ... output will be logged
# and also printed to stdout.
Opens a stream in 'w' mode at daemon spawning and closes it at
daemon joining. If echo is True, also prints the output to stdout.
with logger.force_echo():
# things here will be echoed *and* logged
Under the hood, we spawn a daemon and set up a pipe between this
process and the daemon. The daemon writes our output to both the
file and to stdout (if echoing). The parent process can communicate
with the daemon to tell it when and when not to echo; this is what
force_echo does. You can also enable/disable echoing by typing 'v'.
We try to use OS-level file descriptors to do the redirection, but if
stdout or stderr has been set to some Python-level file object, we
use Python-level redirection instead. This allows the redirection to
work within test frameworks like nose and pytest.
"""
def __init__(
self,
filename,
echo=False,
force_color=False,
debug=False,
input_stream=sys.stdin
):
def __init__(self, filename, echo=False, debug=False):
"""Create a new output log context manager.
Logger daemon is not started until ``__enter__()``.
"""
self.filename = filename
# Various output options
self.echo = echo
self.force_color = force_color
self.debug = debug
# Default is to try file-descriptor reassignment unless the system
# out/err streams do not have an associated file descriptor
self.directAssignment = False
self.read, self.write = os.pipe()
# Needed to un-summon the daemon
self.parent_pipe, self.child_pipe = multiprocessing.Pipe()
# Input stream that controls verbosity interactively
self.input_stream = input_stream
self._active = False # used to prevent re-entry
def __enter__(self):
if self._active:
raise RuntimeError("Can't re-enter the same log_output!")
# record parent color settings before redirecting. We do this
# because color output depends on whether the *original* stdout
# is a TTY. New stdout won't be a TTY so we force colorization.
self._saved_color = tty.color._force_color
forced_color = tty.color.get_color_when()
# also record parent debug settings -- in case the logger is
# forcing debug output.
self._saved_debug = tty._debug
# OS-level pipe for redirecting output to logger
self.read_fd, self.write_fd = os.pipe()
# Sets a daemon that writes to file what it reads from a pipe
try:
fwd_input_stream = lang.duplicate_stream(self.input_stream)
self.p = multiprocessing.Process(
target=self._spawn_writing_daemon,
args=(self.read, fwd_input_stream),
name='logger_daemon'
)
self.p.daemon = True
self.p.start()
# need to pass this b/c multiprocessing closes stdin in child.
input_stream = os.fdopen(os.dup(sys.stdin.fileno()))
self.process = multiprocessing.Process(
target=self._writer_daemon, args=(input_stream,))
self.process.daemon = True # must set before start()
self.process.start()
os.close(self.read_fd) # close in the parent process
finally:
fwd_input_stream.close()
return log_output.OutputRedirection(self)
input_stream.close()
# Flush immediately before redirecting so that anything buffered
# goes to the original stream
sys.stdout.flush()
sys.stderr.flush()
# Now do the actual output rediction.
self.use_fds = True
try:
# We try first to use OS-level file descriptors, as this
# redirects output for subprocesses and system calls.
# Save old stdout and stderr file descriptors
self._saved_stdout = os.dup(sys.stdout.fileno())
self._saved_stderr = os.dup(sys.stderr.fileno())
# redirect to the pipe we created above
os.dup2(self.write_fd, sys.stdout.fileno())
os.dup2(self.write_fd, sys.stderr.fileno())
os.close(self.write_fd)
except AttributeError:
# Using file descriptors can fail if stdout and stderr don't
# have a fileno attribute. This can happen, when, e.g., the
# test framework replaces stdout with a StringIO object. We
# handle thi the Python way. This won't redirect lower-level
# output, but it's the best we can do.
self.use_fds = False
# Save old stdout and stderr file objects
self._saved_stdout = sys.stdout
self._saved_stderr = sys.stderr
# create a file object for the pipe; redirect to it.
pipe_fd_out = os.fdopen(self.write_fd, 'w')
sys.stdout = pipe_fd_out
sys.stderr = pipe_fd_out
# Force color and debug settings now that we have redirected.
tty.color.set_color_when(forced_color)
tty._debug = self.debug
# track whether we're currently inside this log_output
self._active = True
# return this log_output object so that the user can do things
# like temporarily echo some ouptut.
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.parent_pipe.send(True)
self.p.join(60.0) # 1 minute to join the child
# Flush any buffered output to the logger daemon.
sys.stdout.flush()
sys.stderr.flush()
def _spawn_writing_daemon(self, read, input_stream):
# This is the Parent: read from child, skip the with block.
# restore previous output settings, either the low-level way or
# the python way
if self.use_fds:
os.dup2(self._saved_stdout, sys.stdout.fileno())
os.close(self._saved_stdout)
os.dup2(self._saved_stderr, sys.stderr.fileno())
os.close(self._saved_stderr)
else:
sys.stdout = self._saved_stdout
sys.stderr = self._saved_stderr
# join the daemon process. The daemon will quit automatically
# when the write pipe is closed; we just wait for it here.
self.process.join()
# restore old color and debug settings
tty.color._force_color = self._saved_color
tty._debug = self._saved_debug
self._active = False # safe to enter again
@contextmanager
def force_echo(self):
"""Context manager to force local echo, even if echo is off."""
if not self._active:
raise RuntimeError(
"Can't call force_echo() outside log_output region!")
# This uses the xon/xoff to highlight regions to be echoed in the
# output. We us these control characters rather than, say, a
# separate pipe, because they're in-band and assured to appear
# exactly before and after the text we want to echo.
sys.stdout.write(xon)
sys.stdout.flush()
yield
sys.stdout.write(xoff)
sys.stdout.flush()
def _writer_daemon(self, stdin):
"""Daemon that writes output to the log file and stdout."""
# Use line buffering (3rd param = 1) since Python 3 has a bug
# that prevents unbuffered text I/O.
read_file = os.fdopen(read, 'r', 1)
in_pipe = os.fdopen(self.read_fd, 'r', 1)
os.close(self.write_fd)
echo = self.echo # initial echo setting, user-controllable
force_echo = False # parent can force echo for certain output
with open(self.filename, 'w') as log_file:
with keyboard_input(input_stream):
with keyboard_input(stdin):
while True:
# Without the last parameter (timeout) select will wait
# until at least one of the two streams are ready. This
# may cause the function to hang.
rlist, _, _ = select.select(
[read_file, input_stream], [], [], 0
)
# Without the last parameter (timeout) select will
# wait until at least one of the two streams are
# ready. This may cause the function to hang.
rlist, _, xlist = select.select(
[in_pipe, stdin], [], [], 0)
# Allow user to toggle echo with 'v' key.
# Currently ignores other chars.
if input_stream in rlist:
if input_stream.read(1) == 'v':
self.echo = not self.echo
if stdin in rlist:
if stdin.read(1) == 'v':
echo = not echo
# Handle output from the with block process.
if read_file in rlist:
# If we arrive here it means that
# read_file was ready for reading : it
# should never happen that line is false-ish
line = read_file.readline()
if in_pipe in rlist:
# If we arrive here it means that in_pipe was
# ready for reading : it should never happen that
# line is false-ish
line = in_pipe.readline()
if not line:
break # EOF
# Echo to stdout if requested.
if self.echo:
# find control characters and strip them.
controls = control.findall(line)
line = re.sub(control, '', line)
# Echo to stdout if requested or forced
if echo or force_echo:
sys.stdout.write(line)
# Stripped output to log file.
log_file.write(_strip(line))
log_file.flush()
if self.child_pipe.poll():
break
def __del__(self):
"""Closes the pipes"""
os.close(self.write)
os.close(self.read)
class OutputRedirection(object):
def __init__(self, other):
self.__dict__.update(other.__dict__)
def __enter__(self):
"""Redirect output from the with block to a file.
Hijacks stdout / stderr and writes to the pipe
connected to the logger daemon
"""
# remember these values for later.
self._force_color = color._force_color
self._debug = tty._debug
# Redirect this output to a pipe
write = self.write
try:
# Save old stdout and stderr
self._stdout = os.dup(sys.stdout.fileno())
self._stderr = os.dup(sys.stderr.fileno())
# redirect to the pipe.
os.dup2(write, sys.stdout.fileno())
os.dup2(write, sys.stderr.fileno())
except AttributeError:
self.directAssignment = True
self._stdout = sys.stdout
self._stderr = sys.stderr
output_redirect = os.fdopen(write, 'w')
sys.stdout = output_redirect
sys.stderr = output_redirect
if self.force_color:
color._force_color = True
if self.debug:
tty._debug = True
def __exit__(self, exc_type, exception, traceback):
"""Plugs back the original file descriptors
for stdout and stderr
"""
# Flush the log to disk.
sys.stdout.flush()
sys.stderr.flush()
if self.directAssignment:
# We seem to need this only to pass test/install.py
sys.stdout = self._stdout
sys.stderr = self._stderr
else:
os.dup2(self._stdout, sys.stdout.fileno())
os.dup2(self._stderr, sys.stderr.fileno())
# restore output options.
color._force_color = self._force_color
tty._debug = self._debug
if xon in controls:
force_echo = True
if xoff in controls:
force_echo = False

View File

@ -54,13 +54,11 @@
import inspect
import multiprocessing
import os
import errno
import shutil
import sys
import traceback
from six import iteritems
import llnl.util.lang as lang
import llnl.util.tty as tty
from llnl.util.filesystem import *
@ -536,21 +534,30 @@ def child_fun():
control over the environment, etc. without affecting other builds
that might be executed in the same spack call.
If something goes wrong, the child process is expected to print the
error and the parent process will exit with error as well. If things
go well, the child exits and the parent carries on.
If something goes wrong, the child process catches the error and
passes it to the parent wrapped in a ChildError. The parent is
expected to handle (or re-raise) the ChildError.
"""
def child_execution(child_connection, input_stream):
def child_process(child_pipe, input_stream):
# We are in the child process. Python sets sys.stdin to
# open(os.devnull) to prevent our process and its parent from
# simultaneously reading from the original stdin. But, we assume
# that the parent process is not going to read from it till we
# are done with the child, so we undo Python's precaution.
if input_stream is not None:
sys.stdin = input_stream
try:
setup_package(pkg, dirty=dirty)
function(input_stream)
child_connection.send(None)
function()
child_pipe.send(None)
except StopIteration as e:
# StopIteration is used to stop installations
# before the final stage, mainly for debug purposes
tty.msg(e.message)
child_connection.send(None)
child_pipe.send(None)
except:
# catch ANYTHING that goes wrong in the child process
exc_type, exc, tb = sys.exc_info()
@ -569,34 +576,29 @@ def child_execution(child_connection, input_stream):
# make a pickleable exception to send to parent.
msg = "%s: %s" % (str(exc_type.__name__), str(exc))
ce = ChildError(msg, tb_string, build_log, package_context)
child_connection.send(ce)
child_pipe.send(ce)
finally:
child_connection.close()
child_pipe.close()
parent_connection, child_connection = multiprocessing.Pipe()
parent_pipe, child_pipe = multiprocessing.Pipe()
input_stream = None
try:
# Forward sys.stdin to be able to activate / deactivate
# verbosity pressing a key at run-time. When sys.stdin can't
# be duplicated (e.g. running under nohup, which results in an
# '[Errno 22] Invalid argument') then just use os.devnull
try:
input_stream = lang.duplicate_stream(sys.stdin)
except OSError as e:
if e.errno == errno.EINVAL:
tty.debug("Using devnull as input_stream")
input_stream = open(os.devnull)
# Forward sys.stdin when appropriate, to allow toggling verbosity
if sys.stdin.isatty() and hasattr(sys.stdin, 'fileno'):
input_stream = os.fdopen(os.dup(sys.stdin.fileno()))
p = multiprocessing.Process(
target=child_execution,
args=(child_connection, input_stream)
)
target=child_process, args=(child_pipe, input_stream))
p.start()
finally:
# Close the input stream in the parent process
if input_stream is not None:
input_stream.close()
child_exc = parent_connection.recv()
child_exc = parent_pipe.recv()
p.join()
if child_exc is not None:

View File

@ -186,10 +186,9 @@ def _flush_callbacks(check_name):
# Clear the attribute for the next class
setattr(mcs, attr_name, {})
# Preconditions
_flush_callbacks('run_before')
# Sanity checks
_flush_callbacks('run_after')
return super(PackageMeta, mcs).__new__(mcs, name, bases, attr_dict)
@staticmethod
@ -1275,23 +1274,10 @@ def do_install(self,
self.make_jobs = make_jobs
# Then install the package itself.
def build_process(input_stream):
def build_process():
"""Forked for each build. Has its own process and python
module space set up by build_environment.fork()."""
# We are in the child process. This means that our sys.stdin is
# equal to open(os.devnull). Python did this to prevent our process
# and the parent process from possible simultaneous reading from
# the original standard input. But we assume that the parent
# process is not going to read from it till we are done here,
# otherwise it should not have passed us the copy of the stream.
# Thus, we are free to work with the the copy (input_stream)
# however we want. For example, we might want to call functions
# (e.g. input()) that implicitly read from whatever stream is
# assigned to sys.stdin. Since we want them to work with the
# original input stream, we are making the following assignment:
sys.stdin = input_stream
start_time = time.time()
if not fake:
if not skip_patch:
@ -1328,23 +1314,20 @@ def build_process(input_stream):
# Spawn a daemon that reads from a pipe and redirects
# everything to log_path
redirection_context = log_output(
log_path,
with log_output(log_path,
echo=verbose,
force_color=sys.stdout.isatty(),
debug=True,
input_stream=input_stream
)
with redirection_context as log_redirection:
for phase_name, phase in zip(
debug=True) as logger:
for phase_name, phase_attr in zip(
self.phases, self._InstallPhase_phases):
tty.msg(
'Executing phase : \'{0}\''.format(phase_name)
)
with logger.force_echo():
tty.msg("Executing phase: '%s'" % phase_name)
# Redirect stdout and stderr to daemon pipe
with log_redirection:
getattr(self, phase)(
self.spec, self.prefix)
phase = getattr(self, phase_attr)
phase(self.spec, self.prefix)
self.log()
# Run post install hooks before build stage is removed.
spack.hooks.post_install(self.spec)
@ -1363,8 +1346,10 @@ def build_process(input_stream):
# Create the install prefix and fork the build process.
if not os.path.exists(self.prefix):
spack.store.layout.create_install_directory(self.spec)
# Fork a child to do the actual installation
spack.build_environment.fork(self, build_process, dirty=dirty)
# If we installed then we should keep the prefix
keep_prefix = self.last_phase is None or keep_prefix
# note: PARENT of the build process adds the new package to
@ -1439,12 +1424,9 @@ def _do_install_pop_kwargs(self, kwargs):
def log(self):
# Copy provenance into the install directory on success
log_install_path = spack.store.layout.build_log_path(
self.spec)
env_install_path = spack.store.layout.build_env_path(
self.spec)
packages_dir = spack.store.layout.build_packages_path(
self.spec)
log_install_path = spack.store.layout.build_log_path(self.spec)
env_install_path = spack.store.layout.build_env_path(self.spec)
packages_dir = spack.store.layout.build_packages_path(self.spec)
# Remove first if we're overwriting another build
# (can happen with spack setup)

View File

@ -27,10 +27,7 @@
import os
import re
import shutil
from six import StringIO
import llnl.util.filesystem
import llnl.util.lang
import ordereddict_backport
import py
@ -52,17 +49,6 @@
##########
# Monkey-patching that is applied to all tests
##########
@pytest.fixture(autouse=True)
def no_stdin_duplication(monkeypatch):
"""Duplicating stdin (or any other stream) returns an empty
StringIO object.
"""
monkeypatch.setattr(llnl.util.lang, 'duplicate_stream',
lambda x: StringIO())
@pytest.fixture(autouse=True)
def mock_fetch_cache(monkeypatch):
"""Substitutes spack.fetch_cache with a mock object that does nothing