commands: use a single ThreadPool for spack versions (#16749)

This fixes a fork bomb in `spack versions`. Recursive generation of pools
to scrape URLs in `_spider` was creating large numbers of processes.
Instead of recursively creating process pools, we now use a single
`ThreadPool` with a concurrency limit.

More on the issue: having ~10 users running at the same time spack
versions on front-end nodes caused kernel lockup due to the high number
of sockets opened (sys-admin reports ~210k distributed over 3 nodes).
Users were internal, so they had ulimit -n set to ~70k.

The forking behavior could be observed by just running:

    $ spack versions boost

and checking the number of processes spawned. Number of processes
per se was not the issue, but each one of them opens a socket
which can stress `iptables`.

In the original issue the kernel watchdog was reporting:

    Message from syslogd@login03 at May 19 12:01:30 ...
    kernel:Watchdog CPU:110 Hard LOCKUP
    Message from syslogd@login03 at May 19 12:01:31 ...
    kernel:watchdog: BUG: soft lockup - CPU#110 stuck for 23s! [python3:2756]
    Message from syslogd@login03 at May 19 12:01:31 ...
    kernel:watchdog: BUG: soft lockup - CPU#94 stuck for 22s! [iptables:5603]
This commit is contained in:
Massimiliano Culpo 2020-06-05 09:08:32 +02:00 committed by GitHub
parent 92e24950e5
commit 5b272e3ff3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 218 additions and 241 deletions

View File

@ -21,6 +21,10 @@
def setup_parser(subparser): def setup_parser(subparser):
subparser.add_argument('-s', '--safe-only', action='store_true', subparser.add_argument('-s', '--safe-only', action='store_true',
help='only list safe versions of the package') help='only list safe versions of the package')
subparser.add_argument(
'-c', '--concurrency', default=32, type=int,
help='number of concurrent requests'
)
arguments.add_common_arguments(subparser, ['package']) arguments.add_common_arguments(subparser, ['package'])
@ -45,7 +49,7 @@ def versions(parser, args):
if sys.stdout.isatty(): if sys.stdout.isatty():
tty.msg('Remote versions (not yet checksummed):') tty.msg('Remote versions (not yet checksummed):')
fetched_versions = pkg.fetch_remote_versions() fetched_versions = pkg.fetch_remote_versions(args.concurrency)
remote_versions = set(fetched_versions).difference(safe_versions) remote_versions = set(fetched_versions).difference(safe_versions)
if not remote_versions: if not remote_versions:

View File

@ -2020,7 +2020,7 @@ def all_urls(self):
urls.append(args['url']) urls.append(args['url'])
return urls return urls
def fetch_remote_versions(self): def fetch_remote_versions(self, concurrency=128):
"""Find remote versions of this package. """Find remote versions of this package.
Uses ``list_url`` and any other URLs listed in the package file. Uses ``list_url`` and any other URLs listed in the package file.
@ -2033,7 +2033,8 @@ def fetch_remote_versions(self):
try: try:
return spack.util.web.find_versions_of_archive( return spack.util.web.find_versions_of_archive(
self.all_urls, self.list_url, self.list_depth) self.all_urls, self.list_url, self.list_depth, concurrency
)
except spack.util.web.NoNetworkConnectionError as e: except spack.util.web.NoNetworkConnectionError as e:
tty.die("Package.fetch_versions couldn't connect to:", e.url, tty.die("Package.fetch_versions couldn't connect to:", e.url,
e.message) e.message)

View File

@ -2,125 +2,101 @@
# Spack Project Developers. See the top-level COPYRIGHT file for details. # Spack Project Developers. See the top-level COPYRIGHT file for details.
# #
# SPDX-License-Identifier: (Apache-2.0 OR MIT) # SPDX-License-Identifier: (Apache-2.0 OR MIT)
"""Tests for web.py."""
import os import os
import ordereddict_backport
import pytest import pytest
from ordereddict_backport import OrderedDict
import spack.paths import spack.paths
import spack.util.web as web_util import spack.util.web
from spack.version import ver from spack.version import ver
web_data_path = os.path.join(spack.paths.test_path, 'data', 'web') def _create_url(relative_url):
web_data_path = os.path.join(spack.paths.test_path, 'data', 'web')
root = 'file://' + web_data_path + '/index.html' return 'file://' + os.path.join(web_data_path, relative_url)
root_tarball = 'file://' + web_data_path + '/foo-0.0.0.tar.gz'
page_1 = 'file://' + os.path.join(web_data_path, '1.html')
page_2 = 'file://' + os.path.join(web_data_path, '2.html')
page_3 = 'file://' + os.path.join(web_data_path, '3.html')
page_4 = 'file://' + os.path.join(web_data_path, '4.html')
def test_spider_0(): root = _create_url('index.html')
pages, links = web_util.spider(root, depth=0) root_tarball = _create_url('foo-0.0.0.tar.gz')
page_1 = _create_url('1.html')
assert root in pages page_2 = _create_url('2.html')
assert page_1 not in pages page_3 = _create_url('3.html')
assert page_2 not in pages page_4 = _create_url('4.html')
assert page_3 not in pages
assert page_4 not in pages
assert "This is the root page." in pages[root]
assert root not in links
assert page_1 in links
assert page_2 not in links
assert page_3 not in links
assert page_4 not in links
def test_spider_1(): @pytest.mark.parametrize(
pages, links = web_util.spider(root, depth=1) 'depth,expected_found,expected_not_found,expected_text', [
(0,
{'pages': [root], 'links': [page_1]},
{'pages': [page_1, page_2, page_3, page_4],
'links': [root, page_2, page_3, page_4]},
{root: "This is the root page."}),
(1,
{'pages': [root, page_1], 'links': [page_1, page_2]},
{'pages': [page_2, page_3, page_4],
'links': [root, page_3, page_4]},
{root: "This is the root page.",
page_1: "This is page 1."}),
(2,
{'pages': [root, page_1, page_2],
'links': [page_1, page_2, page_3, page_4]},
{'pages': [page_3, page_4], 'links': [root]},
{root: "This is the root page.",
page_1: "This is page 1.",
page_2: "This is page 2."}),
(3,
{'pages': [root, page_1, page_2, page_3, page_4],
'links': [root, page_1, page_2, page_3, page_4]},
{'pages': [], 'links': []},
{root: "This is the root page.",
page_1: "This is page 1.",
page_2: "This is page 2.",
page_3: "This is page 3.",
page_4: "This is page 4."}),
])
def test_spider(depth, expected_found, expected_not_found, expected_text):
pages, links = spack.util.web.spider(root, depth=depth)
assert root in pages for page in expected_found['pages']:
assert page_1 in pages assert page in pages
assert page_2 not in pages
assert page_3 not in pages
assert page_4 not in pages
assert "This is the root page." in pages[root] for page in expected_not_found['pages']:
assert "This is page 1." in pages[page_1] assert page not in pages
assert root not in links for link in expected_found['links']:
assert page_1 in links assert link in links
assert page_2 in links
assert page_3 not in links for link in expected_not_found['links']:
assert page_4 not in links assert link not in links
for page, text in expected_text.items():
assert text in pages[page]
def test_spider_2(): def test_spider_no_response(monkeypatch):
pages, links = web_util.spider(root, depth=2) # Mock the absence of a response
monkeypatch.setattr(
assert root in pages spack.util.web, 'read_from_url', lambda x, y: (None, None, None)
assert page_1 in pages )
assert page_2 in pages pages, links = spack.util.web.spider(root, depth=0)
assert page_3 not in pages assert not pages and not links
assert page_4 not in pages
assert "This is the root page." in pages[root]
assert "This is page 1." in pages[page_1]
assert "This is page 2." in pages[page_2]
assert root not in links
assert page_1 in links
assert page_1 in links
assert page_2 in links
assert page_3 in links
assert page_4 in links
def test_spider_3():
pages, links = web_util.spider(root, depth=3)
assert root in pages
assert page_1 in pages
assert page_2 in pages
assert page_3 in pages
assert page_4 in pages
assert "This is the root page." in pages[root]
assert "This is page 1." in pages[page_1]
assert "This is page 2." in pages[page_2]
assert "This is page 3." in pages[page_3]
assert "This is page 4." in pages[page_4]
assert root in links # circular link on page 3
assert page_1 in links
assert page_1 in links
assert page_2 in links
assert page_3 in links
assert page_4 in links
def test_find_versions_of_archive_0(): def test_find_versions_of_archive_0():
versions = web_util.find_versions_of_archive( versions = spack.util.web.find_versions_of_archive(
root_tarball, root, list_depth=0) root_tarball, root, list_depth=0)
assert ver('0.0.0') in versions assert ver('0.0.0') in versions
def test_find_versions_of_archive_1(): def test_find_versions_of_archive_1():
versions = web_util.find_versions_of_archive( versions = spack.util.web.find_versions_of_archive(
root_tarball, root, list_depth=1) root_tarball, root, list_depth=1)
assert ver('0.0.0') in versions assert ver('0.0.0') in versions
assert ver('1.0.0') in versions assert ver('1.0.0') in versions
def test_find_versions_of_archive_2(): def test_find_versions_of_archive_2():
versions = web_util.find_versions_of_archive( versions = spack.util.web.find_versions_of_archive(
root_tarball, root, list_depth=2) root_tarball, root, list_depth=2)
assert ver('0.0.0') in versions assert ver('0.0.0') in versions
assert ver('1.0.0') in versions assert ver('1.0.0') in versions
@ -128,14 +104,14 @@ def test_find_versions_of_archive_2():
def test_find_exotic_versions_of_archive_2(): def test_find_exotic_versions_of_archive_2():
versions = web_util.find_versions_of_archive( versions = spack.util.web.find_versions_of_archive(
root_tarball, root, list_depth=2) root_tarball, root, list_depth=2)
# up for grabs to make this better. # up for grabs to make this better.
assert ver('2.0.0b2') in versions assert ver('2.0.0b2') in versions
def test_find_versions_of_archive_3(): def test_find_versions_of_archive_3():
versions = web_util.find_versions_of_archive( versions = spack.util.web.find_versions_of_archive(
root_tarball, root, list_depth=3) root_tarball, root, list_depth=3)
assert ver('0.0.0') in versions assert ver('0.0.0') in versions
assert ver('1.0.0') in versions assert ver('1.0.0') in versions
@ -145,7 +121,7 @@ def test_find_versions_of_archive_3():
def test_find_exotic_versions_of_archive_3(): def test_find_exotic_versions_of_archive_3():
versions = web_util.find_versions_of_archive( versions = spack.util.web.find_versions_of_archive(
root_tarball, root, list_depth=3) root_tarball, root, list_depth=3)
assert ver('2.0.0b2') in versions assert ver('2.0.0b2') in versions
assert ver('3.0a1') in versions assert ver('3.0a1') in versions
@ -159,35 +135,35 @@ def test_get_header():
# looking up headers should just work like a plain dict # looking up headers should just work like a plain dict
# lookup when there is an entry with the right key # lookup when there is an entry with the right key
assert(web_util.get_header(headers, 'Content-type') == 'text/plain') assert(spack.util.web.get_header(headers, 'Content-type') == 'text/plain')
# looking up headers should still work if there is a fuzzy match # looking up headers should still work if there is a fuzzy match
assert(web_util.get_header(headers, 'contentType') == 'text/plain') assert(spack.util.web.get_header(headers, 'contentType') == 'text/plain')
# ...unless there is an exact match for the "fuzzy" spelling. # ...unless there is an exact match for the "fuzzy" spelling.
headers['contentType'] = 'text/html' headers['contentType'] = 'text/html'
assert(web_util.get_header(headers, 'contentType') == 'text/html') assert(spack.util.web.get_header(headers, 'contentType') == 'text/html')
# If lookup has to fallback to fuzzy matching and there are more than one # If lookup has to fallback to fuzzy matching and there are more than one
# fuzzy match, the result depends on the internal ordering of the given # fuzzy match, the result depends on the internal ordering of the given
# mapping # mapping
headers = OrderedDict() headers = ordereddict_backport.OrderedDict()
headers['Content-type'] = 'text/plain' headers['Content-type'] = 'text/plain'
headers['contentType'] = 'text/html' headers['contentType'] = 'text/html'
assert(web_util.get_header(headers, 'CONTENT_TYPE') == 'text/plain') assert(spack.util.web.get_header(headers, 'CONTENT_TYPE') == 'text/plain')
del headers['Content-type'] del headers['Content-type']
assert(web_util.get_header(headers, 'CONTENT_TYPE') == 'text/html') assert(spack.util.web.get_header(headers, 'CONTENT_TYPE') == 'text/html')
# Same as above, but different ordering # Same as above, but different ordering
headers = OrderedDict() headers = ordereddict_backport.OrderedDict()
headers['contentType'] = 'text/html' headers['contentType'] = 'text/html'
headers['Content-type'] = 'text/plain' headers['Content-type'] = 'text/plain'
assert(web_util.get_header(headers, 'CONTENT_TYPE') == 'text/html') assert(spack.util.web.get_header(headers, 'CONTENT_TYPE') == 'text/html')
del headers['contentType'] del headers['contentType']
assert(web_util.get_header(headers, 'CONTENT_TYPE') == 'text/plain') assert(spack.util.web.get_header(headers, 'CONTENT_TYPE') == 'text/plain')
# If there isn't even a fuzzy match, raise KeyError # If there isn't even a fuzzy match, raise KeyError
with pytest.raises(KeyError): with pytest.raises(KeyError):
web_util.get_header(headers, 'ContentLength') spack.util.web.get_header(headers, 'ContentLength')

View File

@ -7,17 +7,18 @@
import codecs import codecs
import errno import errno
import re import multiprocessing.pool
import os import os
import os.path import os.path
import re
import shutil import shutil
import ssl import ssl
import sys import sys
import traceback import traceback
from six.moves.urllib.request import urlopen, Request import six
from six.moves.urllib.error import URLError from six.moves.urllib.error import URLError
import multiprocessing.pool from six.moves.urllib.request import urlopen, Request
try: try:
# Python 2 had these in the HTMLParser package. # Python 2 had these in the HTMLParser package.
@ -63,34 +64,6 @@ def handle_starttag(self, tag, attrs):
self.links.append(val) self.links.append(val)
class NonDaemonProcess(multiprocessing.Process):
"""Process that allows sub-processes, so pools can have sub-pools."""
@property
def daemon(self):
return False
@daemon.setter
def daemon(self, value):
pass
if sys.version_info[0] < 3:
class NonDaemonPool(multiprocessing.pool.Pool):
"""Pool that uses non-daemon processes"""
Process = NonDaemonProcess
else:
class NonDaemonContext(type(multiprocessing.get_context())): # novm
Process = NonDaemonProcess
class NonDaemonPool(multiprocessing.pool.Pool):
"""Pool that uses non-daemon processes"""
def __init__(self, *args, **kwargs):
kwargs['context'] = NonDaemonContext()
super(NonDaemonPool, self).__init__(*args, **kwargs)
def uses_ssl(parsed_url): def uses_ssl(parsed_url):
if parsed_url.scheme == 'https': if parsed_url.scheme == 'https':
return True return True
@ -336,109 +309,152 @@ def list_url(url):
for key in _iter_s3_prefix(s3, url))) for key in _iter_s3_prefix(s3, url)))
def _spider(url, visited, root, depth, max_depth, raise_on_error): def spider(root_urls, depth=0, concurrency=32):
"""Fetches URL and any pages it links to up to max_depth. """Get web pages from root URLs.
depth should initially be zero, and max_depth is the max depth of If depth is specified (e.g., depth=2), then this will also follow
links to follow from the root. up to <depth> levels of links from each root.
Prints out a warning only if the root can't be fetched; it ignores Args:
errors with pages that the root links to. root_urls (str or list of str): root urls used as a starting point
for spidering
depth (int): level of recursion into links
concurrency (int): number of simultaneous requests that can be sent
Returns a tuple of: Returns:
- pages: dict of pages visited (URL) mapped to their full text. A dict of pages visited (URL) mapped to their full text and the
- links: set of links encountered while visiting the pages. set of visited links.
""" """
pages = {} # dict from page URL -> text content. # Cache of visited links, meant to be captured by the closure below
links = set() # set of all links seen on visited pages. _visited = set()
try: def _spider(url, collect_nested):
response_url, _, response = read_from_url(url, 'text/html') """Fetches URL and any pages it links to.
if not response_url or not response:
return pages, links
page = codecs.getreader('utf-8')(response).read() Prints out a warning only if the root can't be fetched; it ignores
pages[response_url] = page errors with pages that the root links to.
# Parse out the links in the page Args:
link_parser = LinkParser() url (str): url being fetched and searched for links
collect_nested (bool): whether we want to collect arguments
for nested spidering on the links found in this url
Returns:
A tuple of:
- pages: dict of pages visited (URL) mapped to their full text.
- links: set of links encountered while visiting the pages.
- spider_args: argument for subsequent call to spider
"""
pages = {} # dict from page URL -> text content.
links = set() # set of all links seen on visited pages.
subcalls = [] subcalls = []
link_parser.feed(page)
while link_parser.links: try:
raw_link = link_parser.links.pop() response_url, _, response = read_from_url(url, 'text/html')
abs_link = url_util.join( if not response_url or not response:
response_url, return pages, links, subcalls
raw_link.strip(),
resolve_href=True)
links.add(abs_link)
# Skip stuff that looks like an archive page = codecs.getreader('utf-8')(response).read()
if any(raw_link.endswith(suf) for suf in ALLOWED_ARCHIVE_TYPES): pages[response_url] = page
continue
# Skip things outside the root directory # Parse out the links in the page
if not abs_link.startswith(root): link_parser = LinkParser()
continue link_parser.feed(page)
# Skip already-visited links while link_parser.links:
if abs_link in visited: raw_link = link_parser.links.pop()
continue abs_link = url_util.join(
response_url,
raw_link.strip(),
resolve_href=True)
links.add(abs_link)
# If we're not at max depth, follow links. # Skip stuff that looks like an archive
if depth < max_depth: if any(raw_link.endswith(s) for s in ALLOWED_ARCHIVE_TYPES):
subcalls.append((abs_link, visited, root, continue
depth + 1, max_depth, raise_on_error))
visited.add(abs_link)
if subcalls: # Skip already-visited links
pool = NonDaemonPool(processes=len(subcalls)) if abs_link in _visited:
try: continue
results = pool.map(_spider_wrapper, subcalls)
for sub_pages, sub_links in results: # If we're not at max depth, follow links.
pages.update(sub_pages) if collect_nested:
links.update(sub_links) subcalls.append((abs_link,))
_visited.add(abs_link)
finally: except URLError as e:
pool.terminate() tty.debug(str(e))
pool.join()
except URLError as e: if hasattr(e, 'reason') and isinstance(e.reason, ssl.SSLError):
tty.debug(e) tty.warn("Spack was unable to fetch url list due to a "
"certificate verification problem. You can try "
"running spack -k, which will not check SSL "
"certificates. Use this at your own risk.")
if hasattr(e, 'reason') and isinstance(e.reason, ssl.SSLError): except HTMLParseError as e:
tty.warn("Spack was unable to fetch url list due to a certificate " # This error indicates that Python's HTML parser sucks.
"verification problem. You can try running spack -k, " msg = "Got an error parsing HTML."
"which will not check SSL certificates. Use this at your "
"own risk.")
if raise_on_error: # Pre-2.7.3 Pythons in particular have rather prickly HTML parsing.
raise NoNetworkConnectionError(str(e), url) if sys.version_info[:3] < (2, 7, 3):
msg += " Use Python 2.7.3 or newer for better HTML parsing."
except HTMLParseError as e: tty.warn(msg, url, "HTMLParseError: " + str(e))
# This error indicates that Python's HTML parser sucks.
msg = "Got an error parsing HTML."
# Pre-2.7.3 Pythons in particular have rather prickly HTML parsing. except Exception as e:
if sys.version_info[:3] < (2, 7, 3): # Other types of errors are completely ignored,
msg += " Use Python 2.7.3 or newer for better HTML parsing." # except in debug mode
tty.debug("Error in _spider: %s:%s" % (type(e), str(e)),
traceback.format_exc())
tty.warn(msg, url, "HTMLParseError: " + str(e)) finally:
tty.debug("SPIDER: [url={0}]".format(url))
except Exception as e: return pages, links, subcalls
# Other types of errors are completely ignored, except in debug mode.
tty.debug("Error in _spider: %s:%s" % (type(e), e), # TODO: Needed until we drop support for Python 2.X
traceback.format_exc()) def star(func):
def _wrapper(args):
return func(*args)
return _wrapper
if isinstance(root_urls, six.string_types):
root_urls = [root_urls]
# Clear the local cache of visited pages before starting the search
_visited.clear()
current_depth = 0
pages, links, spider_args = {}, set(), []
collect = current_depth < depth
for root in root_urls:
root = url_util.parse(root)
spider_args.append((root, collect))
tp = multiprocessing.pool.ThreadPool(processes=concurrency)
try:
while current_depth <= depth:
tty.debug("SPIDER: [depth={0}, max_depth={1}, urls={2}]".format(
current_depth, depth, len(spider_args))
)
results = tp.map(star(_spider), spider_args)
spider_args = []
collect = current_depth < depth
for sub_pages, sub_links, sub_spider_args in results:
sub_spider_args = [x + (collect,) for x in sub_spider_args]
pages.update(sub_pages)
links.update(sub_links)
spider_args.extend(sub_spider_args)
current_depth += 1
finally:
tp.terminate()
tp.join()
return pages, links return pages, links
def _spider_wrapper(args):
"""Wrapper for using spider with multiprocessing."""
return _spider(*args)
def _urlopen(req, *args, **kwargs): def _urlopen(req, *args, **kwargs):
"""Wrapper for compatibility with old versions of Python.""" """Wrapper for compatibility with old versions of Python."""
url = req url = req
@ -460,37 +476,22 @@ def _urlopen(req, *args, **kwargs):
return opener(req, *args, **kwargs) return opener(req, *args, **kwargs)
def spider(root, depth=0): def find_versions_of_archive(
"""Gets web pages from a root URL. archive_urls, list_url=None, list_depth=0, concurrency=32
):
If depth is specified (e.g., depth=2), then this will also follow
up to <depth> levels of links from the root.
This will spawn processes to fetch the children, for much improved
performance over a sequential fetch.
"""
root = url_util.parse(root)
pages, links = _spider(root, set(), root, 0, depth, False)
return pages, links
def find_versions_of_archive(archive_urls, list_url=None, list_depth=0):
"""Scrape web pages for new versions of a tarball. """Scrape web pages for new versions of a tarball.
Arguments: Args:
archive_urls (str or list or tuple): URL or sequence of URLs for archive_urls (str or list or tuple): URL or sequence of URLs for
different versions of a package. Typically these are just the different versions of a package. Typically these are just the
tarballs from the package file itself. By default, this searches tarballs from the package file itself. By default, this searches
the parent directories of archives. the parent directories of archives.
Keyword Arguments:
list_url (str or None): URL for a listing of archives. list_url (str or None): URL for a listing of archives.
Spack will scrape these pages for download links that look Spack will scrape these pages for download links that look
like the archive URL. like the archive URL.
list_depth (int): max depth to follow links on list_url pages.
list_depth (int): Max depth to follow links on list_url pages.
Defaults to 0. Defaults to 0.
concurrency (int): maximum number of concurrent requests
""" """
if not isinstance(archive_urls, (list, tuple)): if not isinstance(archive_urls, (list, tuple)):
archive_urls = [archive_urls] archive_urls = [archive_urls]
@ -511,12 +512,7 @@ def find_versions_of_archive(archive_urls, list_url=None, list_depth=0):
list_urls |= additional_list_urls list_urls |= additional_list_urls
# Grab some web pages to scrape. # Grab some web pages to scrape.
pages = {} pages, links = spider(list_urls, depth=list_depth, concurrency=concurrency)
links = set()
for lurl in list_urls:
pg, lnk = spider(lurl, depth=list_depth)
pages.update(pg)
links.update(lnk)
# Scrape them for archive URLs # Scrape them for archive URLs
regexes = [] regexes = []

View File

@ -1510,7 +1510,7 @@ _spack_verify() {
_spack_versions() { _spack_versions() {
if $list_options if $list_options
then then
SPACK_COMPREPLY="-h --help -s --safe-only" SPACK_COMPREPLY="-h --help -s --safe-only -c --concurrency"
else else
_all_packages _all_packages
fi fi