Environment path sanitization and sourcing (#8476)

Add two functions to the EnvironmentModifications object to help
users sanitize environment variables in their package definitions:

* deprioritize_system_paths: this keeps system paths in the
  environment variable but moves them to the end.
* prune_duplicate_paths: remove any duplicate paths from the
  variable

This includes testing for the new functions as well as for
(previously-untested) old convenience functions for environment
variable manipulation.

This also adds special handling for bash functions so they
will be defined when the exported environment file is sourced.
This commit is contained in:
Chris Green 2019-05-06 21:29:18 -05:00 committed by Peter Scheibel
parent 6de3e1c7b6
commit 01eaca607f
3 changed files with 208 additions and 7 deletions

View File

@ -62,9 +62,14 @@ def prepare_environment_for_tests():
os.environ['UNSET_ME'] = 'foo' os.environ['UNSET_ME'] = 'foo'
os.environ['EMPTY_PATH_LIST'] = '' os.environ['EMPTY_PATH_LIST'] = ''
os.environ['PATH_LIST'] = '/path/second:/path/third' os.environ['PATH_LIST'] = '/path/second:/path/third'
os.environ['REMOVE_PATH_LIST'] = '/a/b:/duplicate:/a/c:/remove/this:/a/d:/duplicate/:/f/g' # NOQA: ignore=E501 os.environ['REMOVE_PATH_LIST'] \
= '/a/b:/duplicate:/a/c:/remove/this:/a/d:/duplicate/:/f/g'
os.environ['PATH_LIST_WITH_SYSTEM_PATHS'] \
= '/usr/include:' + os.environ['REMOVE_PATH_LIST']
os.environ['PATH_LIST_WITH_DUPLICATES'] = os.environ['REMOVE_PATH_LIST']
yield yield
for x in ('UNSET_ME', 'EMPTY_PATH_LIST', 'PATH_LIST', 'REMOVE_PATH_LIST'): for x in ('UNSET_ME', 'EMPTY_PATH_LIST', 'PATH_LIST', 'REMOVE_PATH_LIST',
'PATH_LIST_WITH_SYSTEM_PATHS', 'PATH_LIST_WITH_DUPLICATES'):
if x in os.environ: if x in os.environ:
del os.environ[x] del os.environ[x]
@ -203,6 +208,9 @@ def test_path_manipulation(env):
env.remove_path('REMOVE_PATH_LIST', '/remove/this') env.remove_path('REMOVE_PATH_LIST', '/remove/this')
env.remove_path('REMOVE_PATH_LIST', '/duplicate/') env.remove_path('REMOVE_PATH_LIST', '/duplicate/')
env.deprioritize_system_paths('PATH_LIST_WITH_SYSTEM_PATHS')
env.prune_duplicate_paths('PATH_LIST_WITH_DUPLICATES')
env.apply_modifications() env.apply_modifications()
expected = '/path/first:/path/second:/path/third:/path/last' expected = '/path/first:/path/second:/path/third:/path/last'
@ -216,6 +224,12 @@ def test_path_manipulation(env):
assert os.environ['REMOVE_PATH_LIST'] == '/a/b:/a/c:/a/d:/f/g' assert os.environ['REMOVE_PATH_LIST'] == '/a/b:/a/c:/a/d:/f/g'
assert not os.environ['PATH_LIST_WITH_SYSTEM_PATHS'].\
startswith('/usr/include:')
assert os.environ['PATH_LIST_WITH_SYSTEM_PATHS'].endswith(':/usr/include')
assert os.environ['PATH_LIST_WITH_DUPLICATES'].count('/duplicate') == 1
def test_extra_arguments(env): def test_extra_arguments(env):
"""Tests that we can attach extra arguments to any command.""" """Tests that we can attach extra arguments to any command."""

View File

@ -0,0 +1,105 @@
# Copyright 2013-2019 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
"""Test Spack's environment utility functions."""
import pytest
import os
import spack.util.environment as envutil
@pytest.fixture()
def prepare_environment_for_tests():
if 'TEST_ENV_VAR' in os.environ:
del os.environ['TEST_ENV_VAR']
yield
del os.environ['TEST_ENV_VAR']
def test_is_system_path():
assert(envutil.is_system_path('/usr/bin'))
assert(not envutil.is_system_path('/nonsense_path/bin'))
test_paths = ['/usr/bin',
'/nonsense_path/lib',
'/usr/local/lib',
'/bin',
'/nonsense_path/extra/bin',
'/usr/lib64']
def test_filter_system_paths():
expected = [p for p in test_paths if p.startswith('/nonsense_path')]
filtered = envutil.filter_system_paths(test_paths)
assert(expected == filtered)
def deprioritize_system_paths():
expected = [p for p in test_paths if p.startswith('/nonsense_path')]
expected.extend([p for p in test_paths
if not p.startswith('/nonsense_path')])
filtered = envutil.deprioritize_system_paths(test_paths)
assert(expected == filtered)
def test_prune_duplicate_paths():
test_paths = ['/a/b', '/a/c', '/a/b', '/a/a', '/a/c', '/a/a/..']
expected = ['/a/b', '/a/c', '/a/a', '/a/a/..']
assert(expected == envutil.prune_duplicate_paths(test_paths))
def test_get_path(prepare_environment_for_tests):
os.environ['TEST_ENV_VAR'] = '/a:/b:/c/d'
expected = ['/a', '/b', '/c/d']
assert(envutil.get_path('TEST_ENV_VAR') == expected)
def test_env_flag(prepare_environment_for_tests):
assert(not envutil.env_flag('TEST_NO_ENV_VAR'))
os.environ['TEST_ENV_VAR'] = '1'
assert(envutil.env_flag('TEST_ENV_VAR'))
os.environ['TEST_ENV_VAR'] = 'TRUE'
assert(envutil.env_flag('TEST_ENV_VAR'))
os.environ['TEST_ENV_VAR'] = 'True'
assert(envutil.env_flag('TEST_ENV_VAR'))
os.environ['TEST_ENV_VAR'] = 'TRue'
assert(envutil.env_flag('TEST_ENV_VAR'))
os.environ['TEST_ENV_VAR'] = 'true'
assert(envutil.env_flag('TEST_ENV_VAR'))
os.environ['TEST_ENV_VAR'] = '27'
assert(not envutil.env_flag('TEST_ENV_VAR'))
os.environ['TEST_ENV_VAR'] = '-2.3'
assert(not envutil.env_flag('TEST_ENV_VAR'))
os.environ['TEST_ENV_VAR'] = '0'
assert(not envutil.env_flag('TEST_ENV_VAR'))
os.environ['TEST_ENV_VAR'] = 'False'
assert(not envutil.env_flag('TEST_ENV_VAR'))
os.environ['TEST_ENV_VAR'] = 'false'
assert(not envutil.env_flag('TEST_ENV_VAR'))
os.environ['TEST_ENV_VAR'] = 'garbage'
assert(not envutil.env_flag('TEST_ENV_VAR'))
def test_path_set(prepare_environment_for_tests):
envutil.path_set('TEST_ENV_VAR', ['/a', '/a/b', '/a/a'])
assert(os.environ['TEST_ENV_VAR'] == '/a:/a/b:/a/a')
def test_path_put_first(prepare_environment_for_tests):
envutil.path_set('TEST_ENV_VAR', test_paths)
expected = ['/usr/bin', '/new_nonsense_path/a/b']
expected.extend([p for p in test_paths if p != '/usr/bin'])
envutil.path_put_first('TEST_ENV_VAR', expected)
assert(envutil.get_path('TEST_ENV_VAR') == expected)
def test_dump_environment(prepare_environment_for_tests, tmpdir):
test_paths = '/a:/b/x:/b/c'
os.environ['TEST_ENV_VAR'] = test_paths
dumpfile_path = str(tmpdir.join('envdump.txt'))
envutil.dump_environment(dumpfile_path)
with open(dumpfile_path, 'r') as dumpfile:
assert('TEST_ENV_VAR={0}; export TEST_ENV_VAR\n'.format(test_paths)
in list(dumpfile))

View File

@ -18,6 +18,8 @@
from llnl.util.lang import dedupe from llnl.util.lang import dedupe
from six.moves import shlex_quote as cmd_quote
from six.moves import cPickle
system_paths = ['/', '/usr', '/usr/local'] system_paths = ['/', '/usr', '/usr/local']
suffixes = ['bin', 'bin64', 'include', 'lib', 'lib64'] suffixes = ['bin', 'bin64', 'include', 'lib', 'lib64']
@ -51,9 +53,22 @@ def is_system_path(path):
def filter_system_paths(paths): def filter_system_paths(paths):
"""Return only paths that are not system paths."""
return [p for p in paths if not is_system_path(p)] return [p for p in paths if not is_system_path(p)]
def deprioritize_system_paths(paths):
"""Put system paths at the end of paths, otherwise preserving order."""
filtered_paths = filter_system_paths(paths)
fp = set(filtered_paths)
return filtered_paths + [p for p in paths if p not in fp]
def prune_duplicate_paths(paths):
"""Returns the paths with duplicates removed, order preserved."""
return list(dedupe(paths))
def get_path(name): def get_path(name):
path = os.environ.get(name, "").strip() path = os.environ.get(name, "").strip()
if path: if path:
@ -88,11 +103,32 @@ def path_put_first(var_name, directories):
path_set(var_name, new_path) path_set(var_name, new_path)
def dump_environment(path): bash_function_finder = re.compile(r'BASH_FUNC_(.*?)\(\)')
"""Dump the current environment out to a file."""
def env_var_to_source_line(var, val):
if var.startswith('BASH_FUNC'):
source_line = 'function {fname}{decl}; export -f {fname}'.\
format(fname=bash_function_finder.sub(r'\1', var),
decl=val)
else:
source_line = '{var}={val}; export {var}'.format(var=var,
val=cmd_quote(val))
return source_line
def dump_environment(path, environment=None):
"""Dump an environment dictionary to a source-able file."""
use_env = environment if environment else os.environ
with open(path, 'w') as env_file: with open(path, 'w') as env_file:
for key, val in sorted(os.environ.items()): for var, val in sorted(use_env.items()):
env_file.write('export %s="%s"\n' % (key, val)) env_file.write('{0}\n'.format(env_var_to_source_line(var, val)))
def pickle_environment(path, environment=None):
"""Pickle an environment dictionary to a file."""
cPickle.dump(dict(environment if environment else os.environ),
open(path, 'wb'), protocol=2)
@contextlib.contextmanager @contextlib.contextmanager
@ -126,7 +162,9 @@ class NameModifier(object):
def __init__(self, name, **kwargs): def __init__(self, name, **kwargs):
self.name = name self.name = name
self.args = {'name': name} self.separator = kwargs.get('separator', ':')
self.args = {'name': name, 'separator': self.separator}
self.args.update(kwargs) self.args.update(kwargs)
def update_args(self, **kwargs): def update_args(self, **kwargs):
@ -208,6 +246,28 @@ def execute(self, env):
env[self.name] = self.separator.join(directories) env[self.name] = self.separator.join(directories)
class DeprioritizeSystemPaths(NameModifier):
def execute(self, env):
environment_value = env.get(self.name, '')
directories = environment_value.split(
self.separator) if environment_value else []
directories = deprioritize_system_paths([os.path.normpath(x)
for x in directories])
env[self.name] = self.separator.join(directories)
class PruneDuplicatePaths(NameModifier):
def execute(self, env):
environment_value = env.get(self.name, '')
directories = environment_value.split(
self.separator) if environment_value else []
directories = prune_duplicate_paths([os.path.normpath(x)
for x in directories])
env[self.name] = self.separator.join(directories)
class EnvironmentModifications(object): class EnvironmentModifications(object):
"""Keeps track of requests to modify the current environment. """Keeps track of requests to modify the current environment.
@ -338,6 +398,28 @@ def remove_path(self, name, path, **kwargs):
item = RemovePath(name, path, **kwargs) item = RemovePath(name, path, **kwargs)
self.env_modifications.append(item) self.env_modifications.append(item)
def deprioritize_system_paths(self, name, **kwargs):
"""Stores a request to deprioritize system paths in a path list,
otherwise preserving the order.
Args:
name: name of the path list in the environment.
"""
kwargs.update(self._get_outside_caller_attributes())
item = DeprioritizeSystemPaths(name, **kwargs)
self.env_modifications.append(item)
def prune_duplicate_paths(self, name, **kwargs):
"""Stores a request to remove duplicates from a path list, otherwise
preserving the order.
Args:
name: name of the path list in the environment.
"""
kwargs.update(self._get_outside_caller_attributes())
item = PruneDuplicatePaths(name, **kwargs)
self.env_modifications.append(item)
def group_by_name(self): def group_by_name(self):
"""Returns a dict of the modifications grouped by variable name. """Returns a dict of the modifications grouped by variable name.